From 156eb12da2ff857fc73eb7ecfb3985c8483388e0 Mon Sep 17 00:00:00 2001 From: Ha Anh Vu <75315486+haanhvu@users.noreply.github.com> Date: Wed, 18 Sep 2024 15:34:42 +0700 Subject: [PATCH] Support PodMonitor for Prometheus Agent in DaemonSet mode (#6879) * Support PodMonitor for Prometheus Agent in DaemonSet mode --- pkg/prometheus/agent/operator.go | 2 +- pkg/prometheus/promcfg.go | 62 ++++- pkg/prometheus/promcfg_test.go | 25 ++ .../PromAgentDaemonSetPodMonitorConfig.golden | 47 ++++ test/e2e/main_test.go | 1 + test/e2e/prometheusagent_test.go | 237 ++++++++++++++++++ test/framework/prometheusagent.go | 5 + .../basic-app-for-daemonset-test.yaml | 33 +++ 8 files changed, 400 insertions(+), 12 deletions(-) create mode 100644 pkg/prometheus/testdata/PromAgentDaemonSetPodMonitorConfig.golden create mode 100644 test/framework/resources/basic-app-for-daemonset-test.yaml diff --git a/pkg/prometheus/agent/operator.go b/pkg/prometheus/agent/operator.go index cd7cc2698..bca3b4875 100644 --- a/pkg/prometheus/agent/operator.go +++ b/pkg/prometheus/agent/operator.go @@ -589,7 +589,7 @@ func (c *Operator) syncDaemonSet(ctx context.Context, key string, p *monitoringv logger.Info("sync prometheus") - opts := []prompkg.ConfigGeneratorOption{} + opts := []prompkg.ConfigGeneratorOption{prompkg.WithDaemonSet()} if c.endpointSliceSupported { opts = append(opts, prompkg.WithEndpointSliceSupport()) } diff --git a/pkg/prometheus/promcfg.go b/pkg/prometheus/promcfg.go index b2da492eb..77c472a39 100644 --- a/pkg/prometheus/promcfg.go +++ b/pkg/prometheus/promcfg.go @@ -68,6 +68,7 @@ type ConfigGenerator struct { useEndpointSlice bool // Whether to use EndpointSlice for service discovery from `ServiceMonitor` objects. scrapeClasses map[string]monitoringv1.ScrapeClass defaultScrapeClassName string + daemonSet bool } type ConfigGeneratorOption func(*ConfigGenerator) @@ -79,6 +80,12 @@ func WithEndpointSliceSupport() ConfigGeneratorOption { } } +func WithDaemonSet() ConfigGeneratorOption { + return func(cg *ConfigGenerator) { + cg.daemonSet = true + } +} + // NewConfigGenerator creates a ConfigGenerator for the provided Prometheus resource. func NewConfigGenerator( logger *slog.Logger, @@ -184,6 +191,7 @@ func (cg *ConfigGenerator) WithKeyVals(keyvals ...interface{}) *ConfigGenerator useEndpointSlice: cg.useEndpointSlice, scrapeClasses: cg.scrapeClasses, defaultScrapeClassName: cg.defaultScrapeClassName, + daemonSet: cg.daemonSet, } } @@ -203,6 +211,7 @@ func (cg *ConfigGenerator) WithMinimumVersion(version string) *ConfigGenerator { useEndpointSlice: cg.useEndpointSlice, scrapeClasses: cg.scrapeClasses, defaultScrapeClassName: cg.defaultScrapeClassName, + daemonSet: cg.daemonSet, } } @@ -225,6 +234,7 @@ func (cg *ConfigGenerator) WithMaximumVersion(version string) *ConfigGenerator { useEndpointSlice: cg.useEndpointSlice, scrapeClasses: cg.scrapeClasses, defaultScrapeClassName: cg.defaultScrapeClassName, + daemonSet: cg.daemonSet, } } @@ -1136,7 +1146,11 @@ func (cg *ConfigGenerator) generatePodMonitorConfig( labeler := namespacelabeler.New(cpf.EnforcedNamespaceLabel, cpf.ExcludedFromEnforcement, false) relabelings = append(relabelings, generateRelabelConfig(labeler.GetRelabelingConfigs(m.TypeMeta, m.ObjectMeta, ep.RelabelConfigs))...) - relabelings = generateAddressShardingRelabelingRules(relabelings, shards) + // DaemonSet mode doesn't support sharding. + if !cg.daemonSet { + relabelings = generateAddressShardingRelabelingRules(relabelings, shards) + } + cfg = append(cfg, yaml.MapItem{Key: "relabel_configs", Value: relabelings}) cfg = cg.AddLimitsToYAML(cfg, sampleLimitKey, m.Spec.SampleLimit, cpf.EnforcedSampleLimit) @@ -1875,6 +1889,22 @@ func (cg *ConfigGenerator) generateK8SSDConfig( }) } + // Specific configuration generated for DaemonSet mode. + if cg.daemonSet { + k8sSDConfig = cg.AppendMapItem(k8sSDConfig, "selectors", []yaml.MapSlice{ + { + { + Key: "role", + Value: "pod", + }, + { + Key: "field", + Value: "spec.nodeName=$(NODE_NAME)", + }, + }, + }) + } + return yaml.MapItem{ Key: "kubernetes_sd_configs", Value: []yaml.MapSlice{ @@ -1984,7 +2014,9 @@ func (cg *ConfigGenerator) generateAdditionalScrapeConfigs( if err != nil { return nil, fmt.Errorf("unmarshalling additional scrape configs failed: %w", err) } - if shards == 1 { + + // DaemonSet mode doesn't support sharding. + if cg.daemonSet || shards == 1 { return additionalScrapeConfigsYaml, nil } @@ -2011,7 +2043,11 @@ func (cg *ConfigGenerator) generateAdditionalScrapeConfigs( relabelings = append(relabelings, relabeling) } } - relabelings = cg.generateAddressShardingRelabelingRulesIfMissing(relabelings, shards) + // DaemonSet mode doesn't support sharding. + if !cg.daemonSet { + relabelings = cg.generateAddressShardingRelabelingRulesIfMissing(relabelings, shards) + } + addlScrapeConfig = append(addlScrapeConfig, otherConfigItems...) addlScrapeConfig = append(addlScrapeConfig, yaml.MapItem{Key: "relabel_configs", Value: relabelings}) addlScrapeConfigs = append(addlScrapeConfigs, addlScrapeConfig) @@ -2575,18 +2611,22 @@ func (cg *ConfigGenerator) GenerateAgentConfiguration( shards = shardsNumber(cg.prom) ) - scrapeConfigs = cg.appendServiceMonitorConfigs(scrapeConfigs, sMons, apiserverConfig, store, shards) scrapeConfigs = cg.appendPodMonitorConfigs(scrapeConfigs, pMons, apiserverConfig, store, shards) - scrapeConfigs = cg.appendProbeConfigs(scrapeConfigs, probes, apiserverConfig, store, shards) - scrapeConfigs, err := cg.appendScrapeConfigs(scrapeConfigs, sCons, store, shards) - if err != nil { - return nil, fmt.Errorf("generate scrape configs: %w", err) - } - - scrapeConfigs, err = cg.appendAdditionalScrapeConfigs(scrapeConfigs, additionalScrapeConfigs, shards) + scrapeConfigs, err := cg.appendAdditionalScrapeConfigs(scrapeConfigs, additionalScrapeConfigs, shards) if err != nil { return nil, fmt.Errorf("generate additional scrape configs: %w", err) } + + // Currently, DaemonSet mode doesn't support these. + if !cg.daemonSet { + scrapeConfigs = cg.appendServiceMonitorConfigs(scrapeConfigs, sMons, apiserverConfig, store, shards) + scrapeConfigs = cg.appendProbeConfigs(scrapeConfigs, probes, apiserverConfig, store, shards) + scrapeConfigs, err = cg.appendScrapeConfigs(scrapeConfigs, sCons, store, shards) + if err != nil { + return nil, fmt.Errorf("generate scrape configs: %w", err) + } + } + cfg = append(cfg, yaml.MapItem{ Key: "scrape_configs", Value: scrapeConfigs, diff --git a/pkg/prometheus/promcfg_test.go b/pkg/prometheus/promcfg_test.go index 75ba5ea02..82faf4a4f 100644 --- a/pkg/prometheus/promcfg_test.go +++ b/pkg/prometheus/promcfg_test.go @@ -44,6 +44,11 @@ func defaultPrometheus() *monitoringv1.Prometheus { }, Spec: monitoringv1.PrometheusSpec{ CommonPrometheusFields: monitoringv1.CommonPrometheusFields{ + PodMonitorSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "group": "group1", + }, + }, ProbeSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "group": "group1", @@ -5477,6 +5482,26 @@ func TestTSDBConfigPrometheusAgent(t *testing.T) { } } +func TestPromAgentDaemonSetPodMonitorConfig(t *testing.T) { + p := defaultPrometheus() + cg := mustNewConfigGenerator(t, p) + cg.daemonSet = true + pmons := map[string]*monitoringv1.PodMonitor{ + "pm": defaultPodMonitor(), + } + cfg, err := cg.GenerateAgentConfiguration( + nil, + pmons, + nil, + nil, + nil, + &assets.StoreBuilder{}, + nil, + ) + require.NoError(t, err) + golden.Assert(t, string(cfg), "PromAgentDaemonSetPodMonitorConfig.golden") +} + func TestGenerateRelabelConfig(t *testing.T) { p := defaultPrometheus() diff --git a/pkg/prometheus/testdata/PromAgentDaemonSetPodMonitorConfig.golden b/pkg/prometheus/testdata/PromAgentDaemonSetPodMonitorConfig.golden new file mode 100644 index 000000000..c9e2d088c --- /dev/null +++ b/pkg/prometheus/testdata/PromAgentDaemonSetPodMonitorConfig.golden @@ -0,0 +1,47 @@ +global: + scrape_interval: 30s + external_labels: + prometheus: default/test + prometheus_replica: $(POD_NAME) +scrape_configs: +- job_name: podMonitor/default/defaultPodMonitor/0 + honor_labels: false + kubernetes_sd_configs: + - role: pod + namespaces: + names: + - default + selectors: + - role: pod + field: spec.nodeName=$(NODE_NAME) + scrape_interval: 30s + relabel_configs: + - source_labels: + - job + target_label: __tmp_prometheus_job_name + - action: drop + source_labels: + - __meta_kubernetes_pod_phase + regex: (Failed|Succeeded) + - action: keep + source_labels: + - __meta_kubernetes_pod_label_group + - __meta_kubernetes_pod_labelpresent_group + regex: (group1);true + - action: keep + source_labels: + - __meta_kubernetes_pod_container_port_name + regex: web + - source_labels: + - __meta_kubernetes_namespace + target_label: namespace + - source_labels: + - __meta_kubernetes_pod_container_name + target_label: container + - source_labels: + - __meta_kubernetes_pod_name + target_label: pod + - target_label: job + replacement: default/defaultPodMonitor + - target_label: endpoint + replacement: web diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index ec7b799fe..f364e2f02 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -412,6 +412,7 @@ func TestGatedFeatures(t *testing.T) { "PromAgentDaemonSetResourceUpdate": testPromAgentDaemonSetResourceUpdate, "PromAgentReconcileDaemonSetResourceUpdate": testPromAgentReconcileDaemonSetResourceUpdate, "PromAgentReconcileDaemonSetResourceDelete": testPromAgentReconcileDaemonSetResourceDelete, + "PrometheusAgentDaemonSetSelectPodMonitor": testPrometheusAgentDaemonSetSelectPodMonitor, } for name, f := range testFuncs { diff --git a/test/e2e/prometheusagent_test.go b/test/e2e/prometheusagent_test.go index 0548c0608..cc5571d50 100644 --- a/test/e2e/prometheusagent_test.go +++ b/test/e2e/prometheusagent_test.go @@ -16,13 +16,19 @@ package e2e import ( "context" + "encoding/json" "fmt" + "io" + "net" + "net/http" "reflect" + "strings" "testing" "time" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +37,7 @@ import ( monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" monitoringv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1" + pa "github.com/prometheus-operator/prometheus-operator/pkg/prometheus/agent" testFramework "github.com/prometheus-operator/prometheus-operator/test/framework" ) @@ -354,3 +361,233 @@ func testPromAgentReconcileDaemonSetResourceDelete(t *testing.T) { err = framework.WaitForPrometheusAgentDSReady(ctx, ns, prometheusAgentDSCRD) require.NoError(t, err) } + +func testPrometheusAgentDaemonSetSelectPodMonitor(t *testing.T) { + testCtx := framework.NewTestCtx(t) + defer testCtx.Cleanup(t) + ctx := context.Background() + name := "test" + + ns := framework.CreateNamespace(context.Background(), t, testCtx) + framework.SetupPrometheusRBAC(context.Background(), t, testCtx, ns) + _, err := framework.CreateOrUpdatePrometheusOperatorWithOpts( + ctx, testFramework.PrometheusOperatorOpts{ + Namespace: ns, + AllowedNamespaces: []string{ns}, + EnabledFeatureGates: []string{"PrometheusAgentDaemonSet"}, + }, + ) + require.NoError(t, err) + + app, err := testFramework.MakeDeployment("../../test/framework/resources/basic-app-for-daemonset-test.yaml") + require.NoError(t, err) + + err = framework.CreateDeployment(ctx, ns, app) + require.NoError(t, err) + + pm := framework.MakeBasicPodMonitor(name) + _, err = framework.MonClientV1.PodMonitors(ns).Create(ctx, pm, metav1.CreateOptions{}) + require.NoError(t, err) + + prometheusAgentDS := framework.MakeBasicPrometheusAgentDaemonSet(ns, name) + _, err = framework.CreatePrometheusAgentAndWaitUntilReady(ctx, ns, prometheusAgentDS) + require.NoError(t, err) + + var pollErr error + var paPods *v1.PodList + var firstTargetIP string + var secondTargetIP string + + appPodsNodes := make([]string, 0, 2) + appPodsIPs := make([]string, 0, 2) + paPodsNodes := make([]string, 0, 2) + + cfg := framework.RestConfig + httpClient := http.Client{} + + err = wait.PollUntilContextTimeout(context.Background(), 15*time.Second, 15*time.Minute, false, func(_ context.Context) (bool, error) { + ctx := context.Background() + + appPods, err := framework.KubeClient.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{ + LabelSelector: "group=test", + }) + if err != nil { + pollErr = fmt.Errorf("can't list app pods: %w", err) + return false, nil + } + + for _, pod := range appPods.Items { + appPodsNodes = append(appPodsNodes, pod.Spec.NodeName) + appPodsIPs = append(appPodsIPs, pod.Status.PodIP) + } + + paPods, err = framework.KubeClient.CoreV1().Pods(ns).List(ctx, pa.ListOptions(name)) + if err != nil { + pollErr = fmt.Errorf("can't list prometheus agent pods: %w", err) + return false, nil + } + + for _, pod := range paPods.Items { + paPodsNodes = append(paPodsNodes, pod.Spec.NodeName) + } + + if len(appPodsNodes) != len(paPodsNodes) { + pollErr = fmt.Errorf("got %d application pods and %d prometheus-agent pods", len(appPodsNodes), len(paPodsNodes)) + return false, nil + } + for _, n := range appPodsNodes { + if !slices.Contains(paPodsNodes, n) { + pollErr = fmt.Errorf("no prometheus-agent pod found on node %s", n) + return false, nil + } + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + closer, err := testFramework.StartPortForward(ctx, cfg, "https", paPods.Items[0].Name, ns, "9090") + if err != nil { + pollErr = fmt.Errorf("can't start port forward to first prometheus agent pod: %w", err) + return false, nil + } + defer closer() + + req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:9090/api/v1/targets", nil) + if err != nil { + pollErr = fmt.Errorf("can't create http request to first prometheus server: %w", err) + return false, nil + } + + resp, err := httpClient.Do(req) + if err != nil { + pollErr = fmt.Errorf("can't send http request to first prometheus server: %w", err) + return false, nil + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + pollErr = fmt.Errorf("can't read http response from first prometheus server: %w", err) + return false, nil + } + + var targetsResponse TargetsResponse + err = json.Unmarshal(body, &targetsResponse) + if err != nil { + pollErr = fmt.Errorf("can't unmarshall target's http response from first prometheus server: %w", err) + return false, nil + } + if len(targetsResponse.Data.ActiveTargets) != 1 { + pollErr = fmt.Errorf("expect 1 target from first prometheus agent. Actual target's response: %#+v", targetsResponse) + return false, nil + } + + target := targetsResponse.Data.ActiveTargets[0] + instance := target.Labels.Instance + host := strings.Split(instance, ":")[0] + ips, err := net.LookupHost(host) + if err != nil { + pollErr = fmt.Errorf("can't find IPs from first target's host: %w", err) + return false, nil + } + + found := false + for _, ip := range ips { + if slices.Contains(appPodsIPs, ip) { + found = true + firstTargetIP = ip + } + } + if found == false { + pollErr = fmt.Errorf("first target IP not found in app's list of pod IPs. Target's IP: %#+v, app's pod IPs: %#+v", ips, appPodsIPs) + return false, nil + } + + return true, nil + }) + require.NoError(t, pollErr) + require.NoError(t, err) + + err = wait.PollUntilContextTimeout(context.Background(), 15*time.Second, 15*time.Minute, false, func(_ context.Context) (bool, error) { + ctx := context.Background() + + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + closer, err := testFramework.StartPortForward(ctx, cfg, "https", paPods.Items[1].Name, ns, "9090") + if err != nil { + pollErr = fmt.Errorf("can't start port forward to second prometheus agent pod: %w", err) + return false, nil + } + defer closer() + + req, err := http.NewRequestWithContext(ctx, "GET", "http://localhost:9090/api/v1/targets", nil) + if err != nil { + pollErr = fmt.Errorf("can't create http request to second prometheus server: %w", err) + return false, nil + } + + resp, err := httpClient.Do(req) + if err != nil { + pollErr = fmt.Errorf("can't send http request to second prometheus server: %w", err) + return false, nil + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + pollErr = fmt.Errorf("can't read http response from second prometheus server: %w", err) + return false, nil + } + + var targetsResponse TargetsResponse + err = json.Unmarshal(body, &targetsResponse) + if err != nil { + pollErr = fmt.Errorf("can't unmarshall target's http response from second prometheus server: %w", err) + return false, nil + } + if len(targetsResponse.Data.ActiveTargets) != 1 { + pollErr = fmt.Errorf("expect 1 target from second prometheus agent. Actual target's response: %#+v", targetsResponse) + return false, nil + } + + target := targetsResponse.Data.ActiveTargets[0] + instance := target.Labels.Instance + host := strings.Split(instance, ":")[0] + ips, err := net.LookupHost(host) + if err != nil { + pollErr = fmt.Errorf("can't find IPs from second target's host: %w", err) + return false, nil + } + + found := false + for _, ip := range ips { + if slices.Contains(appPodsIPs, ip) { + found = true + secondTargetIP = ip + } + } + if found == false { + pollErr = fmt.Errorf("second target IP not found in app's list of pod IPs. Target's IP: %#+v, app's pod IPs: %#+v", ips, appPodsIPs) + return false, nil + } + + return true, nil + }) + + require.NoError(t, pollErr) + require.NoError(t, err) + + require.NotEqual(t, firstTargetIP, secondTargetIP) +} + +type Target struct { + Labels struct { + Instance string `json:"instance"` + } `json:"labels"` +} + +type TargetsResponse struct { + Status string `json:"status"` + Data struct { + ActiveTargets []Target `json:"activeTargets"` + } `json:"data"` +} diff --git a/test/framework/prometheusagent.go b/test/framework/prometheusagent.go index c0a72c7ab..87f635e12 100644 --- a/test/framework/prometheusagent.go +++ b/test/framework/prometheusagent.go @@ -84,6 +84,11 @@ func (f *Framework) MakeBasicPrometheusAgentDaemonSet(ns, name string) *monitori v1.ResourceMemory: resource.MustParse("400Mi"), }, }, + PodMonitorSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "group": name, + }, + }, }, }, } diff --git a/test/framework/resources/basic-app-for-daemonset-test.yaml b/test/framework/resources/basic-app-for-daemonset-test.yaml new file mode 100644 index 000000000..7287986c2 --- /dev/null +++ b/test/framework/resources/basic-app-for-daemonset-test.yaml @@ -0,0 +1,33 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: app-for-daemonset-test + labels: + group: test +spec: + replicas: 2 + selector: + matchLabels: + group: test + template: + metadata: + labels: + group: test + spec: + containers: + - name: example-app + image: quay.io/prometheus-operator/instrumented-sample-app:latest + imagePullPolicy: IfNotPresent + ports: + - name: web + containerPort: 8080 + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: group + operator: In + values: + - test + topologyKey: "kubernetes.io/hostname" \ No newline at end of file