From 5e26cd163dd3e075e97db6c68f7ea76c87e95c7b Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 30 Jan 2026 15:17:13 +0100 Subject: [PATCH] Add CreateStatefulSetOrPatchLabels() to k8sutil package Signed-off-by: Simon Pasquier --- pkg/alertmanager/operator.go | 4 +- pkg/k8sutil/k8sutil.go | 46 ++++++++++++++++-- pkg/k8sutil/k8sutil_test.go | 79 +++++++++++++++++++++++++++++++ pkg/operator/config_resource.go | 6 +-- pkg/operator/finalizer.go | 2 +- pkg/operator/operator.go | 4 +- pkg/prometheus/agent/operator.go | 4 +- pkg/prometheus/server/operator.go | 4 +- pkg/thanos/operator.go | 2 +- 9 files changed, 135 insertions(+), 16 deletions(-) diff --git a/pkg/alertmanager/operator.go b/pkg/alertmanager/operator.go index 1c17b5e97..128ae46fe 100644 --- a/pkg/alertmanager/operator.go +++ b/pkg/alertmanager/operator.go @@ -756,10 +756,10 @@ func (c *Operator) UpdateStatus(ctx context.Context, key string) error { a.Status.Conditions = operator.UpdateConditions(a.Status.Conditions, availableCondition, reconciledCondition) a.Status.Paused = a.Spec.Paused - if _, err = c.mclient.MonitoringV1().Alertmanagers(a.Namespace).ApplyStatus(ctx, ApplyConfigurationFromAlertmanager(a, true), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1().Alertmanagers(a.Namespace).ApplyStatus(ctx, ApplyConfigurationFromAlertmanager(a, true), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { c.logger.Info("failed to apply alertmanager status subresource, trying again without scale fields", "err", err) // Try again, but this time does not update scale subresource. - if _, err = c.mclient.MonitoringV1().Alertmanagers(a.Namespace).ApplyStatus(ctx, ApplyConfigurationFromAlertmanager(a, false), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1().Alertmanagers(a.Namespace).ApplyStatus(ctx, ApplyConfigurationFromAlertmanager(a, false), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { return fmt.Errorf("failed to apply alertmanager status subresource: %w", err) } } diff --git a/pkg/k8sutil/k8sutil.go b/pkg/k8sutil/k8sutil.go index d4d187527..b8a0f8155 100644 --- a/pkg/k8sutil/k8sutil.go +++ b/pkg/k8sutil/k8sutil.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/discovery" @@ -55,10 +56,18 @@ import ( monitoringv1beta1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1beta1" ) -// KubeConfigEnv (optionally) specify the location of kubeconfig file. -const KubeConfigEnv = "KUBECONFIG" +const ( + // KubeConfigEnv (optionally) specify the location of kubeconfig file. + KubeConfigEnv = "KUBECONFIG" -const StatusCleanupFinalizerName = "monitoring.coreos.com/status-cleanup" + // StatusCleanupFinalizerName is the name of the finalizer used to garbage + // collect status bindings on configuration resources. + StatusCleanupFinalizerName = "monitoring.coreos.com/status-cleanup" + + // PrometheusOperatorFieldManager is the field manager name used by the + // operator. + PrometheusOperatorFieldManager = "PrometheusOperator" +) var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+") @@ -316,6 +325,37 @@ func CreateOrUpdateEndpointSlice(ctx context.Context, c clientdiscoveryv1.Endpoi }) } +// CreateStatefulSetOrPatchLabels creates a StatefulSet resource. +// If the StatefulSet already exists, it patches the labels from the input StatefulSet. +func CreateStatefulSetOrPatchLabels(ctx context.Context, ssetClient clientappsv1.StatefulSetInterface, sset *appsv1.StatefulSet) (*appsv1.StatefulSet, error) { + created, err := ssetClient.Create(ctx, sset, metav1.CreateOptions{}) + if err == nil { + return created, nil + } + + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + + // StatefulSet already exists, patch the labels + patchData, err := json.Marshal(map[string]any{ + "metadata": map[string]any{ + "labels": sset.Labels, + }, + }) + if err != nil { + return nil, err + } + + return ssetClient.Patch( + ctx, + sset.Name, + types.StrategicMergePatchType, + patchData, + metav1.PatchOptions{FieldManager: PrometheusOperatorFieldManager}, + ) +} + // updateStatefulSet updates a StatefulSet resource preserving custom labels and annotations from the current resource. func updateStatefulSet(ctx context.Context, sstClient clientappsv1.StatefulSetInterface, sset *appsv1.StatefulSet) error { // As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped. diff --git a/pkg/k8sutil/k8sutil_test.go b/pkg/k8sutil/k8sutil_test.go index 228dd28d9..8e60c8e65 100644 --- a/pkg/k8sutil/k8sutil_test.go +++ b/pkg/k8sutil/k8sutil_test.go @@ -756,6 +756,85 @@ func TestEnsureCustomGoverningService(t *testing.T) { } } +func TestCreateStatefulSetOrPatchLabels(t *testing.T) { + testCases := []struct { + name string + existingStatefulSet *appsv1.StatefulSet + newStatefulSet *appsv1.StatefulSet + expectedLabels map[string]string + }{ + { + name: "create new statefulset successfully", + newStatefulSet: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "prometheus", + Namespace: "default", + Labels: map[string]string{ + "app": "prometheus", + "env": "prod", + }, + }, + }, + expectedLabels: map[string]string{ + "app": "prometheus", + "env": "prod", + }, + }, + { + name: "statefulset already exists - patch labels", + existingStatefulSet: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "prometheus", + Namespace: "default", + Labels: map[string]string{ + "app": "prometheus", + "env": "dev", + }, + }, + }, + newStatefulSet: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "prometheus", + Namespace: "default", + Labels: map[string]string{ + "app": "prometheus", + "env": "prod", + "version": "v2.0", + }, + }, + }, + expectedLabels: map[string]string{ + "app": "prometheus", + "env": "prod", + "version": "v2.0", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + var clientSet *fake.Clientset + if tc.existingStatefulSet != nil { + clientSet = fake.NewClientset(tc.existingStatefulSet) + } else { + clientSet = fake.NewClientset() + } + + ssetClient := clientSet.AppsV1().StatefulSets(tc.newStatefulSet.Namespace) + + _, err := CreateStatefulSetOrPatchLabels(ctx, ssetClient, tc.newStatefulSet) + require.NoError(t, err) + + // Verify the statefulset in the cluster has the expected labels + result, err := ssetClient.Get(ctx, tc.newStatefulSet.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, tc.expectedLabels, result.Labels) + }) + } +} + func makeBarebonesPrometheus(name, ns string) *monitoringv1.Prometheus { return &monitoringv1.Prometheus{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/operator/config_resource.go b/pkg/operator/config_resource.go index 8f8da3932..333f37fb7 100644 --- a/pkg/operator/config_resource.go +++ b/pkg/operator/config_resource.go @@ -213,7 +213,7 @@ func (crs *ConfigResourceSyncer) UpdateBinding(ctx context.Context, configResour ctx, obj, metav1.UpdateOptions{ - FieldManager: PrometheusOperatorFieldManager, + FieldManager: k8sutil.PrometheusOperatorFieldManager, FieldValidation: metav1.FieldValidationStrict, }, ); err != nil { @@ -238,7 +238,7 @@ func (crs *ConfigResourceSyncer) UpdateBinding(ctx context.Context, configResour types.JSONPatchType, patch, metav1.PatchOptions{ - FieldManager: PrometheusOperatorFieldManager, + FieldManager: k8sutil.PrometheusOperatorFieldManager, FieldValidation: metav1.FieldValidationStrict, }, statusSubResource, @@ -270,7 +270,7 @@ func (crs *ConfigResourceSyncer) RemoveBinding(ctx context.Context, configResour types.JSONPatchType, p, metav1.PatchOptions{ - FieldManager: PrometheusOperatorFieldManager, + FieldManager: k8sutil.PrometheusOperatorFieldManager, FieldValidation: metav1.FieldValidationStrict, }, statusSubResource, diff --git a/pkg/operator/finalizer.go b/pkg/operator/finalizer.go index f288f810d..1c67205d1 100644 --- a/pkg/operator/finalizer.go +++ b/pkg/operator/finalizer.go @@ -105,7 +105,7 @@ func (s *FinalizerSyncer) updateObject( ) error { _, err := s.mdClient.Resource(s.gvr). Namespace(p.GetNamespace()). - Patch(ctx, p.GetName(), types.JSONPatchType, patchBytes, metav1.PatchOptions{FieldManager: PrometheusOperatorFieldManager}) + Patch(ctx, p.GetName(), types.JSONPatchType, patchBytes, metav1.PatchOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager}) return err } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index f41513458..663715bcb 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -38,8 +38,8 @@ import ( ) const ( - PrometheusOperatorFieldManager = "PrometheusOperator" - + // InvalidConfigurationEvent is the type used for events reporting invalid + // configuration resources. InvalidConfigurationEvent = "InvalidConfiguration" ) diff --git a/pkg/prometheus/agent/operator.go b/pkg/prometheus/agent/operator.go index 40c5022c7..ad2f0194b 100644 --- a/pkg/prometheus/agent/operator.go +++ b/pkg/prometheus/agent/operator.go @@ -1039,10 +1039,10 @@ func (c *Operator) UpdateStatus(ctx context.Context, key string) error { p.Status.Selector = selector.String() p.Status.Shards = ptr.Deref(p.Spec.Shards, 1) - if _, err = c.mclient.MonitoringV1alpha1().PrometheusAgents(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheusAgent(p, true), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1alpha1().PrometheusAgents(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheusAgent(p, true), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { c.logger.Info("failed to apply prometheus status subresource, trying again without scale fields", "err", err) // Try again, but this time does not update scale subresource. - if _, err = c.mclient.MonitoringV1alpha1().PrometheusAgents(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheusAgent(p, false), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1alpha1().PrometheusAgents(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheusAgent(p, false), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { return fmt.Errorf("failed to Apply prometheus agent status subresource: %w", err) } } diff --git a/pkg/prometheus/server/operator.go b/pkg/prometheus/server/operator.go index 09f5bc1ef..97702bb60 100644 --- a/pkg/prometheus/server/operator.go +++ b/pkg/prometheus/server/operator.go @@ -1230,10 +1230,10 @@ func (c *Operator) UpdateStatus(ctx context.Context, key string) error { p.Status.Selector = selector.String() p.Status.Shards = ptr.Deref(p.Spec.Shards, 1) - if _, err = c.mclient.MonitoringV1().Prometheuses(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheus(p, true), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1().Prometheuses(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheus(p, true), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { c.logger.Info("failed to apply prometheus status subresource, trying again without scale fields", "err", err) // Try again, but this time does not update scale subresource. - if _, err = c.mclient.MonitoringV1().Prometheuses(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheus(p, false), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = c.mclient.MonitoringV1().Prometheuses(p.Namespace).ApplyStatus(ctx, prompkg.ApplyConfigurationFromPrometheus(p, false), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { return fmt.Errorf("failed to apply prometheus status subresource: %w", err) } } diff --git a/pkg/thanos/operator.go b/pkg/thanos/operator.go index f4ce7f709..c51ad2086 100644 --- a/pkg/thanos/operator.go +++ b/pkg/thanos/operator.go @@ -721,7 +721,7 @@ func (o *Operator) UpdateStatus(ctx context.Context, key string) error { tr.Status.Conditions = operator.UpdateConditions(tr.Status.Conditions, availableCondition, reconciledCondition) tr.Status.Paused = tr.Spec.Paused - if _, err = o.mclient.MonitoringV1().ThanosRulers(tr.Namespace).ApplyStatus(ctx, applyConfigurationFromThanosRuler(tr), metav1.ApplyOptions{FieldManager: operator.PrometheusOperatorFieldManager, Force: true}); err != nil { + if _, err = o.mclient.MonitoringV1().ThanosRulers(tr.Namespace).ApplyStatus(ctx, applyConfigurationFromThanosRuler(tr), metav1.ApplyOptions{FieldManager: k8sutil.PrometheusOperatorFieldManager, Force: true}); err != nil { return fmt.Errorf("failed to apply status subresource: %w", err) }