1
0
mirror of https://github.com/coreos/prometheus-operator.git synced 2026-02-05 15:46:31 +01:00

Support PodMonitor for Prometheus Agent in DaemonSet mode (#6879)

* Support PodMonitor for Prometheus Agent in DaemonSet mode
This commit is contained in:
Ha Anh Vu
2024-09-18 15:34:42 +07:00
committed by GitHub
parent 7c510d32f5
commit 156eb12da2
8 changed files with 400 additions and 12 deletions

View File

@@ -589,7 +589,7 @@ func (c *Operator) syncDaemonSet(ctx context.Context, key string, p *monitoringv
logger.Info("sync prometheus")
opts := []prompkg.ConfigGeneratorOption{}
opts := []prompkg.ConfigGeneratorOption{prompkg.WithDaemonSet()}
if c.endpointSliceSupported {
opts = append(opts, prompkg.WithEndpointSliceSupport())
}

View File

@@ -68,6 +68,7 @@ type ConfigGenerator struct {
useEndpointSlice bool // Whether to use EndpointSlice for service discovery from `ServiceMonitor` objects.
scrapeClasses map[string]monitoringv1.ScrapeClass
defaultScrapeClassName string
daemonSet bool
}
type ConfigGeneratorOption func(*ConfigGenerator)
@@ -79,6 +80,12 @@ func WithEndpointSliceSupport() ConfigGeneratorOption {
}
}
func WithDaemonSet() ConfigGeneratorOption {
return func(cg *ConfigGenerator) {
cg.daemonSet = true
}
}
// NewConfigGenerator creates a ConfigGenerator for the provided Prometheus resource.
func NewConfigGenerator(
logger *slog.Logger,
@@ -184,6 +191,7 @@ func (cg *ConfigGenerator) WithKeyVals(keyvals ...interface{}) *ConfigGenerator
useEndpointSlice: cg.useEndpointSlice,
scrapeClasses: cg.scrapeClasses,
defaultScrapeClassName: cg.defaultScrapeClassName,
daemonSet: cg.daemonSet,
}
}
@@ -203,6 +211,7 @@ func (cg *ConfigGenerator) WithMinimumVersion(version string) *ConfigGenerator {
useEndpointSlice: cg.useEndpointSlice,
scrapeClasses: cg.scrapeClasses,
defaultScrapeClassName: cg.defaultScrapeClassName,
daemonSet: cg.daemonSet,
}
}
@@ -225,6 +234,7 @@ func (cg *ConfigGenerator) WithMaximumVersion(version string) *ConfigGenerator {
useEndpointSlice: cg.useEndpointSlice,
scrapeClasses: cg.scrapeClasses,
defaultScrapeClassName: cg.defaultScrapeClassName,
daemonSet: cg.daemonSet,
}
}
@@ -1136,7 +1146,11 @@ func (cg *ConfigGenerator) generatePodMonitorConfig(
labeler := namespacelabeler.New(cpf.EnforcedNamespaceLabel, cpf.ExcludedFromEnforcement, false)
relabelings = append(relabelings, generateRelabelConfig(labeler.GetRelabelingConfigs(m.TypeMeta, m.ObjectMeta, ep.RelabelConfigs))...)
relabelings = generateAddressShardingRelabelingRules(relabelings, shards)
// DaemonSet mode doesn't support sharding.
if !cg.daemonSet {
relabelings = generateAddressShardingRelabelingRules(relabelings, shards)
}
cfg = append(cfg, yaml.MapItem{Key: "relabel_configs", Value: relabelings})
cfg = cg.AddLimitsToYAML(cfg, sampleLimitKey, m.Spec.SampleLimit, cpf.EnforcedSampleLimit)
@@ -1875,6 +1889,22 @@ func (cg *ConfigGenerator) generateK8SSDConfig(
})
}
// Specific configuration generated for DaemonSet mode.
if cg.daemonSet {
k8sSDConfig = cg.AppendMapItem(k8sSDConfig, "selectors", []yaml.MapSlice{
{
{
Key: "role",
Value: "pod",
},
{
Key: "field",
Value: "spec.nodeName=$(NODE_NAME)",
},
},
})
}
return yaml.MapItem{
Key: "kubernetes_sd_configs",
Value: []yaml.MapSlice{
@@ -1984,7 +2014,9 @@ func (cg *ConfigGenerator) generateAdditionalScrapeConfigs(
if err != nil {
return nil, fmt.Errorf("unmarshalling additional scrape configs failed: %w", err)
}
if shards == 1 {
// DaemonSet mode doesn't support sharding.
if cg.daemonSet || shards == 1 {
return additionalScrapeConfigsYaml, nil
}
@@ -2011,7 +2043,11 @@ func (cg *ConfigGenerator) generateAdditionalScrapeConfigs(
relabelings = append(relabelings, relabeling)
}
}
relabelings = cg.generateAddressShardingRelabelingRulesIfMissing(relabelings, shards)
// DaemonSet mode doesn't support sharding.
if !cg.daemonSet {
relabelings = cg.generateAddressShardingRelabelingRulesIfMissing(relabelings, shards)
}
addlScrapeConfig = append(addlScrapeConfig, otherConfigItems...)
addlScrapeConfig = append(addlScrapeConfig, yaml.MapItem{Key: "relabel_configs", Value: relabelings})
addlScrapeConfigs = append(addlScrapeConfigs, addlScrapeConfig)
@@ -2575,18 +2611,22 @@ func (cg *ConfigGenerator) GenerateAgentConfiguration(
shards = shardsNumber(cg.prom)
)
scrapeConfigs = cg.appendServiceMonitorConfigs(scrapeConfigs, sMons, apiserverConfig, store, shards)
scrapeConfigs = cg.appendPodMonitorConfigs(scrapeConfigs, pMons, apiserverConfig, store, shards)
scrapeConfigs = cg.appendProbeConfigs(scrapeConfigs, probes, apiserverConfig, store, shards)
scrapeConfigs, err := cg.appendScrapeConfigs(scrapeConfigs, sCons, store, shards)
if err != nil {
return nil, fmt.Errorf("generate scrape configs: %w", err)
}
scrapeConfigs, err = cg.appendAdditionalScrapeConfigs(scrapeConfigs, additionalScrapeConfigs, shards)
scrapeConfigs, err := cg.appendAdditionalScrapeConfigs(scrapeConfigs, additionalScrapeConfigs, shards)
if err != nil {
return nil, fmt.Errorf("generate additional scrape configs: %w", err)
}
// Currently, DaemonSet mode doesn't support these.
if !cg.daemonSet {
scrapeConfigs = cg.appendServiceMonitorConfigs(scrapeConfigs, sMons, apiserverConfig, store, shards)
scrapeConfigs = cg.appendProbeConfigs(scrapeConfigs, probes, apiserverConfig, store, shards)
scrapeConfigs, err = cg.appendScrapeConfigs(scrapeConfigs, sCons, store, shards)
if err != nil {
return nil, fmt.Errorf("generate scrape configs: %w", err)
}
}
cfg = append(cfg, yaml.MapItem{
Key: "scrape_configs",
Value: scrapeConfigs,

View File

@@ -44,6 +44,11 @@ func defaultPrometheus() *monitoringv1.Prometheus {
},
Spec: monitoringv1.PrometheusSpec{
CommonPrometheusFields: monitoringv1.CommonPrometheusFields{
PodMonitorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"group": "group1",
},
},
ProbeSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"group": "group1",
@@ -5477,6 +5482,26 @@ func TestTSDBConfigPrometheusAgent(t *testing.T) {
}
}
func TestPromAgentDaemonSetPodMonitorConfig(t *testing.T) {
p := defaultPrometheus()
cg := mustNewConfigGenerator(t, p)
cg.daemonSet = true
pmons := map[string]*monitoringv1.PodMonitor{
"pm": defaultPodMonitor(),
}
cfg, err := cg.GenerateAgentConfiguration(
nil,
pmons,
nil,
nil,
nil,
&assets.StoreBuilder{},
nil,
)
require.NoError(t, err)
golden.Assert(t, string(cfg), "PromAgentDaemonSetPodMonitorConfig.golden")
}
func TestGenerateRelabelConfig(t *testing.T) {
p := defaultPrometheus()

View File

@@ -0,0 +1,47 @@
global:
scrape_interval: 30s
external_labels:
prometheus: default/test
prometheus_replica: $(POD_NAME)
scrape_configs:
- job_name: podMonitor/default/defaultPodMonitor/0
honor_labels: false
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- default
selectors:
- role: pod
field: spec.nodeName=$(NODE_NAME)
scrape_interval: 30s
relabel_configs:
- source_labels:
- job
target_label: __tmp_prometheus_job_name
- action: drop
source_labels:
- __meta_kubernetes_pod_phase
regex: (Failed|Succeeded)
- action: keep
source_labels:
- __meta_kubernetes_pod_label_group
- __meta_kubernetes_pod_labelpresent_group
regex: (group1);true
- action: keep
source_labels:
- __meta_kubernetes_pod_container_port_name
regex: web
- source_labels:
- __meta_kubernetes_namespace
target_label: namespace
- source_labels:
- __meta_kubernetes_pod_container_name
target_label: container
- source_labels:
- __meta_kubernetes_pod_name
target_label: pod
- target_label: job
replacement: default/defaultPodMonitor
- target_label: endpoint
replacement: web

View File

@@ -412,6 +412,7 @@ func TestGatedFeatures(t *testing.T) {
"PromAgentDaemonSetResourceUpdate": testPromAgentDaemonSetResourceUpdate,
"PromAgentReconcileDaemonSetResourceUpdate": testPromAgentReconcileDaemonSetResourceUpdate,
"PromAgentReconcileDaemonSetResourceDelete": testPromAgentReconcileDaemonSetResourceDelete,
"PrometheusAgentDaemonSetSelectPodMonitor": testPrometheusAgentDaemonSetSelectPodMonitor,
}
for name, f := range testFuncs {

View File

@@ -16,13 +16,19 @@ package e2e
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"reflect"
"strings"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -31,6 +37,7 @@ import (
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
pa "github.com/prometheus-operator/prometheus-operator/pkg/prometheus/agent"
testFramework "github.com/prometheus-operator/prometheus-operator/test/framework"
)
@@ -354,3 +361,233 @@ func testPromAgentReconcileDaemonSetResourceDelete(t *testing.T) {
err = framework.WaitForPrometheusAgentDSReady(ctx, ns, prometheusAgentDSCRD)
require.NoError(t, err)
}
func testPrometheusAgentDaemonSetSelectPodMonitor(t *testing.T) {
testCtx := framework.NewTestCtx(t)
defer testCtx.Cleanup(t)
ctx := context.Background()
name := "test"
ns := framework.CreateNamespace(context.Background(), t, testCtx)
framework.SetupPrometheusRBAC(context.Background(), t, testCtx, ns)
_, err := framework.CreateOrUpdatePrometheusOperatorWithOpts(
ctx, testFramework.PrometheusOperatorOpts{
Namespace: ns,
AllowedNamespaces: []string{ns},
EnabledFeatureGates: []string{"PrometheusAgentDaemonSet"},
},
)
require.NoError(t, err)
app, err := testFramework.MakeDeployment("../../test/framework/resources/basic-app-for-daemonset-test.yaml")
require.NoError(t, err)
err = framework.CreateDeployment(ctx, ns, app)
require.NoError(t, err)
pm := framework.MakeBasicPodMonitor(name)
_, err = framework.MonClientV1.PodMonitors(ns).Create(ctx, pm, metav1.CreateOptions{})
require.NoError(t, err)
prometheusAgentDS := framework.MakeBasicPrometheusAgentDaemonSet(ns, name)
_, err = framework.CreatePrometheusAgentAndWaitUntilReady(ctx, ns, prometheusAgentDS)
require.NoError(t, err)
var pollErr error
var paPods *v1.PodList
var firstTargetIP string
var secondTargetIP string
appPodsNodes := make([]string, 0, 2)
appPodsIPs := make([]string, 0, 2)
paPodsNodes := make([]string, 0, 2)
cfg := framework.RestConfig
httpClient := http.Client{}
err = wait.PollUntilContextTimeout(context.Background(), 15*time.Second, 15*time.Minute, false, func(_ context.Context) (bool, error) {
ctx := context.Background()
appPods, err := framework.KubeClient.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: "group=test",
})
if err != nil {
pollErr = fmt.Errorf("can't list app pods: %w", err)
return false, nil
}
for _, pod := range appPods.Items {
appPodsNodes = append(appPodsNodes, pod.Spec.NodeName)
appPodsIPs = append(appPodsIPs, pod.Status.PodIP)
}
paPods, err = framework.KubeClient.CoreV1().Pods(ns).List(ctx, pa.ListOptions(name))
if err != nil {
pollErr = fmt.Errorf("can't list prometheus agent pods: %w", err)
return false, nil
}
for _, pod := range paPods.Items {
paPodsNodes = append(paPodsNodes, pod.Spec.NodeName)
}
if len(appPodsNodes) != len(paPodsNodes) {
pollErr = fmt.Errorf("got %d application pods and %d prometheus-agent pods", len(appPodsNodes), len(paPodsNodes))
return false, nil
}
for _, n := range appPodsNodes {
if !slices.Contains(paPodsNodes, n) {
pollErr = fmt.Errorf("no prometheus-agent pod found on node %s", n)
return false, nil
}
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
closer, err := testFramework.StartPortForward(ctx, cfg, "https", paPods.Items[0].Name, ns, "9090")
if err != nil {
pollErr = fmt.Errorf("can't start port forward to first prometheus agent pod: %w", err)
return false, nil
}
defer closer()
req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:9090/api/v1/targets", nil)
if err != nil {
pollErr = fmt.Errorf("can't create http request to first prometheus server: %w", err)
return false, nil
}
resp, err := httpClient.Do(req)
if err != nil {
pollErr = fmt.Errorf("can't send http request to first prometheus server: %w", err)
return false, nil
}
body, err := io.ReadAll(resp.Body)
if err != nil {
pollErr = fmt.Errorf("can't read http response from first prometheus server: %w", err)
return false, nil
}
var targetsResponse TargetsResponse
err = json.Unmarshal(body, &targetsResponse)
if err != nil {
pollErr = fmt.Errorf("can't unmarshall target's http response from first prometheus server: %w", err)
return false, nil
}
if len(targetsResponse.Data.ActiveTargets) != 1 {
pollErr = fmt.Errorf("expect 1 target from first prometheus agent. Actual target's response: %#+v", targetsResponse)
return false, nil
}
target := targetsResponse.Data.ActiveTargets[0]
instance := target.Labels.Instance
host := strings.Split(instance, ":")[0]
ips, err := net.LookupHost(host)
if err != nil {
pollErr = fmt.Errorf("can't find IPs from first target's host: %w", err)
return false, nil
}
found := false
for _, ip := range ips {
if slices.Contains(appPodsIPs, ip) {
found = true
firstTargetIP = ip
}
}
if found == false {
pollErr = fmt.Errorf("first target IP not found in app's list of pod IPs. Target's IP: %#+v, app's pod IPs: %#+v", ips, appPodsIPs)
return false, nil
}
return true, nil
})
require.NoError(t, pollErr)
require.NoError(t, err)
err = wait.PollUntilContextTimeout(context.Background(), 15*time.Second, 15*time.Minute, false, func(_ context.Context) (bool, error) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
closer, err := testFramework.StartPortForward(ctx, cfg, "https", paPods.Items[1].Name, ns, "9090")
if err != nil {
pollErr = fmt.Errorf("can't start port forward to second prometheus agent pod: %w", err)
return false, nil
}
defer closer()
req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:9090/api/v1/targets", nil)
if err != nil {
pollErr = fmt.Errorf("can't create http request to second prometheus server: %w", err)
return false, nil
}
resp, err := httpClient.Do(req)
if err != nil {
pollErr = fmt.Errorf("can't send http request to second prometheus server: %w", err)
return false, nil
}
body, err := io.ReadAll(resp.Body)
if err != nil {
pollErr = fmt.Errorf("can't read http response from second prometheus server: %w", err)
return false, nil
}
var targetsResponse TargetsResponse
err = json.Unmarshal(body, &targetsResponse)
if err != nil {
pollErr = fmt.Errorf("can't unmarshall target's http response from second prometheus server: %w", err)
return false, nil
}
if len(targetsResponse.Data.ActiveTargets) != 1 {
pollErr = fmt.Errorf("expect 1 target from second prometheus agent. Actual target's response: %#+v", targetsResponse)
return false, nil
}
target := targetsResponse.Data.ActiveTargets[0]
instance := target.Labels.Instance
host := strings.Split(instance, ":")[0]
ips, err := net.LookupHost(host)
if err != nil {
pollErr = fmt.Errorf("can't find IPs from second target's host: %w", err)
return false, nil
}
found := false
for _, ip := range ips {
if slices.Contains(appPodsIPs, ip) {
found = true
secondTargetIP = ip
}
}
if found == false {
pollErr = fmt.Errorf("second target IP not found in app's list of pod IPs. Target's IP: %#+v, app's pod IPs: %#+v", ips, appPodsIPs)
return false, nil
}
return true, nil
})
require.NoError(t, pollErr)
require.NoError(t, err)
require.NotEqual(t, firstTargetIP, secondTargetIP)
}
type Target struct {
Labels struct {
Instance string `json:"instance"`
} `json:"labels"`
}
type TargetsResponse struct {
Status string `json:"status"`
Data struct {
ActiveTargets []Target `json:"activeTargets"`
} `json:"data"`
}

View File

@@ -84,6 +84,11 @@ func (f *Framework) MakeBasicPrometheusAgentDaemonSet(ns, name string) *monitori
v1.ResourceMemory: resource.MustParse("400Mi"),
},
},
PodMonitorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"group": name,
},
},
},
},
}

View File

@@ -0,0 +1,33 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-for-daemonset-test
labels:
group: test
spec:
replicas: 2
selector:
matchLabels:
group: test
template:
metadata:
labels:
group: test
spec:
containers:
- name: example-app
image: quay.io/prometheus-operator/instrumented-sample-app:latest
imagePullPolicy: IfNotPresent
ports:
- name: web
containerPort: 8080
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: group
operator: In
values:
- test
topologyKey: "kubernetes.io/hostname"