mirror of
https://github.com/coreos/prometheus-operator.git
synced 2026-02-05 15:46:31 +01:00
chore: move patch operations to ConfigResourceSyncer
This commit enables ConfigResourceSyncer to work with any configuration resource instead of being limited to Service Monitors. Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
@@ -22,9 +22,15 @@ const (
|
||||
PrometheusesKind = "Prometheus"
|
||||
PrometheusName = "prometheuses"
|
||||
|
||||
PrometheusAgentsKind = "PrometheusAgent"
|
||||
PrometheusAgentName = "prometheusagents"
|
||||
|
||||
AlertmanagersKind = "Alertmanager"
|
||||
AlertmanagerName = "alertmanagers"
|
||||
|
||||
AlertmanagerConfigsKind = "AlertmanagerConfig"
|
||||
AlertmanagerConfigName = "alertmanagerconfigs"
|
||||
|
||||
ServiceMonitorsKind = "ServiceMonitor"
|
||||
ServiceMonitorName = "servicemonitors"
|
||||
|
||||
@@ -39,22 +45,51 @@ const (
|
||||
|
||||
ScrapeConfigsKind = "ScrapeConfig"
|
||||
ScrapeConfigName = "scrapeconfigs"
|
||||
|
||||
ThanosRulersKind = "ThanosRuler"
|
||||
ThanosRulerName = "thanosrulers"
|
||||
)
|
||||
|
||||
var resourceToKindMap = map[string]string{
|
||||
PrometheusName: PrometheusesKind,
|
||||
AlertmanagerName: AlertmanagersKind,
|
||||
ServiceMonitorName: ServiceMonitorsKind,
|
||||
PodMonitorName: PodMonitorsKind,
|
||||
PrometheusRuleName: PrometheusRuleKind,
|
||||
ProbeName: ProbesKind,
|
||||
ScrapeConfigName: ScrapeConfigsKind,
|
||||
PrometheusName: PrometheusesKind,
|
||||
PrometheusAgentName: PrometheusAgentsKind,
|
||||
AlertmanagerName: AlertmanagersKind,
|
||||
AlertmanagerConfigName: AlertmanagerConfigsKind,
|
||||
ServiceMonitorName: ServiceMonitorsKind,
|
||||
PodMonitorName: PodMonitorsKind,
|
||||
PrometheusRuleName: PrometheusRuleKind,
|
||||
ProbeName: ProbesKind,
|
||||
ScrapeConfigName: ScrapeConfigsKind,
|
||||
ThanosRulerName: ThanosRulersKind,
|
||||
}
|
||||
|
||||
func ResourceToKind(s string) string {
|
||||
kind, found := resourceToKindMap[s]
|
||||
var kindToResource = map[string]string{
|
||||
PrometheusesKind: PrometheusName,
|
||||
PrometheusAgentsKind: PrometheusAgentName,
|
||||
AlertmanagersKind: AlertmanagerName,
|
||||
AlertmanagerConfigsKind: AlertmanagerConfigName,
|
||||
ServiceMonitorsKind: ServiceMonitorName,
|
||||
PodMonitorsKind: PodMonitorName,
|
||||
PrometheusRuleKind: PrometheusRuleName,
|
||||
ProbesKind: ProbeName,
|
||||
ScrapeConfigsKind: ScrapeConfigName,
|
||||
ThanosRulersKind: ThanosRulerName,
|
||||
}
|
||||
|
||||
// KindToResource returns the resource name corresponding to the given kind.
|
||||
func KindToResource(k string) string {
|
||||
kind, found := kindToResource[k]
|
||||
if !found {
|
||||
panic(fmt.Sprintf("failed to map resource %q to a kind", s))
|
||||
panic(fmt.Sprintf("failed to map kind %q to a resource name", k))
|
||||
}
|
||||
return kind
|
||||
}
|
||||
|
||||
// ResourceToKind returns the kind corresponding to the given resource name.
|
||||
func ResourceToKind(r string) string {
|
||||
kind, found := resourceToKindMap[r]
|
||||
if !found {
|
||||
panic(fmt.Sprintf("failed to map resource %q to a kind", r))
|
||||
}
|
||||
return kind
|
||||
}
|
||||
|
||||
@@ -70,38 +70,3 @@ func prometheusStatusApplyConfigurationFromPrometheusStatus(status *monitoringv1
|
||||
|
||||
return psac
|
||||
}
|
||||
|
||||
// ApplyConfigurationFromServiceMonitor updates the ServiceMonitor Status subresource.
|
||||
func ApplyConfigurationFromServiceMonitor(sm *monitoringv1.ServiceMonitor) *monitoringv1ac.ServiceMonitorApplyConfiguration {
|
||||
smsac := configResourceStatusApplyConfigurationFromConfigResourceStatus(&sm.Status)
|
||||
return monitoringv1ac.ServiceMonitor(sm.Name, sm.Namespace).
|
||||
WithStatus(smsac)
|
||||
}
|
||||
|
||||
func configResourceStatusApplyConfigurationFromConfigResourceStatus(status *monitoringv1.ConfigResourceStatus) *monitoringv1ac.ConfigResourceStatusApplyConfiguration {
|
||||
crsac := monitoringv1ac.ConfigResourceStatus()
|
||||
|
||||
for _, binding := range status.Bindings {
|
||||
bg := monitoringv1ac.WorkloadBinding().
|
||||
WithGroup(binding.Group).
|
||||
WithName(binding.Name).
|
||||
WithNamespace(binding.Namespace).
|
||||
WithResource(binding.Resource)
|
||||
|
||||
for _, condition := range binding.Conditions {
|
||||
bg.WithConditions(
|
||||
monitoringv1ac.ConfigResourceCondition().
|
||||
WithType(condition.Type).
|
||||
WithStatus(condition.Status).
|
||||
WithLastTransitionTime(condition.LastTransitionTime).
|
||||
WithReason(condition.Reason).
|
||||
WithMessage(condition.Message).
|
||||
WithObservedGeneration(condition.ObservedGeneration),
|
||||
)
|
||||
}
|
||||
|
||||
crsac.WithBindings(bg)
|
||||
}
|
||||
|
||||
return crsac
|
||||
}
|
||||
|
||||
277
pkg/prometheus/config_resource.go
Normal file
277
pkg/prometheus/config_resource.go
Normal file
@@ -0,0 +1,277 @@
|
||||
// Copyright 2025 The prometheus-operator Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/dynamic"
|
||||
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring"
|
||||
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
|
||||
)
|
||||
|
||||
// ConfigResourceSyncer patches the status of configuration resources.
|
||||
type ConfigResourceSyncer struct {
|
||||
client dynamic.Interface
|
||||
|
||||
// GroupVersionResource and metadata of the Workload.
|
||||
gvr schema.GroupVersionResource
|
||||
workload metav1.Object
|
||||
}
|
||||
|
||||
func NewConfigResourceSyncer(workload RuntimeObject, client dynamic.Interface) *ConfigResourceSyncer {
|
||||
return &ConfigResourceSyncer{
|
||||
client: client,
|
||||
gvr: toGroupVersionResource(workload),
|
||||
workload: workload,
|
||||
}
|
||||
}
|
||||
|
||||
type patch []patchOperation
|
||||
|
||||
type patchOperation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value any `json:"value,omitempty"`
|
||||
}
|
||||
|
||||
// GetBindingIndex returns the index of the workload binding in the slice.
|
||||
// The return value is negative if there's no binding for the workload.
|
||||
func (crs *ConfigResourceSyncer) GetBindingIndex(bindings []monitoringv1.WorkloadBinding) int {
|
||||
for i, binding := range bindings {
|
||||
if binding.Namespace == crs.workload.GetNamespace() &&
|
||||
binding.Name == crs.workload.GetName() &&
|
||||
binding.Group == crs.gvr.Group &&
|
||||
binding.Resource == crs.gvr.Resource {
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
|
||||
func (crs *ConfigResourceSyncer) newBinding(conditions []monitoringv1.ConfigResourceCondition) monitoringv1.WorkloadBinding {
|
||||
return monitoringv1.WorkloadBinding{
|
||||
Namespace: crs.workload.GetNamespace(),
|
||||
Name: crs.workload.GetName(),
|
||||
Resource: crs.gvr.Resource,
|
||||
Group: crs.gvr.Group,
|
||||
Conditions: conditions,
|
||||
}
|
||||
}
|
||||
|
||||
type RuntimeObject interface {
|
||||
runtime.Object
|
||||
metav1.Object
|
||||
}
|
||||
|
||||
// UpdateBinding updates the workload's binding in the configuration resource's
|
||||
// status subresource.
|
||||
// If the binding is up-to-date, this a no-operation.
|
||||
func (crs *ConfigResourceSyncer) UpdateBinding(
|
||||
ctx context.Context,
|
||||
configResource RuntimeObject,
|
||||
bindings []monitoringv1.WorkloadBinding,
|
||||
conditions []monitoringv1.ConfigResourceCondition) error {
|
||||
patch, err := crs.updateBindingPatch(bindings, conditions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(patch) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = crs.client.Resource(toGroupVersionResource(configResource)).Namespace(configResource.GetNamespace()).Patch(
|
||||
ctx,
|
||||
configResource.GetName(),
|
||||
types.JSONPatchType,
|
||||
patch,
|
||||
metav1.PatchOptions{
|
||||
FieldManager: operator.PrometheusOperatorFieldManager,
|
||||
FieldValidation: metav1.FieldValidationStrict,
|
||||
},
|
||||
statusSubResource,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// RemoveBinding removes the workload's binding from the configuration
|
||||
// resource's status subresource.
|
||||
// If the workload has no binding, this a no-operation.
|
||||
func (crs *ConfigResourceSyncer) RemoveBinding(
|
||||
ctx context.Context,
|
||||
configResource RuntimeObject,
|
||||
bindings []monitoringv1.WorkloadBinding) error {
|
||||
p, err := crs.removeBindingPatch(bindings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(p) == 0 {
|
||||
// Binding not found.
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = crs.client.Resource(toGroupVersionResource(configResource)).Namespace(configResource.GetNamespace()).Patch(
|
||||
ctx,
|
||||
configResource.GetName(),
|
||||
types.JSONPatchType,
|
||||
p,
|
||||
metav1.PatchOptions{
|
||||
FieldManager: operator.PrometheusOperatorFieldManager,
|
||||
FieldValidation: metav1.FieldValidationStrict,
|
||||
},
|
||||
statusSubResource,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func toGroupVersionResource(o runtime.Object) schema.GroupVersionResource {
|
||||
gvk := o.GetObjectKind().GroupVersionKind()
|
||||
return schema.GroupVersionResource{
|
||||
Group: gvk.Group,
|
||||
Version: gvk.Version,
|
||||
Resource: monitoring.KindToResource(gvk.Kind),
|
||||
}
|
||||
}
|
||||
|
||||
// updateBindingPatch returns a RFC-6902 JSON patch which updates the
|
||||
// conditions of the resource's status.
|
||||
// If the binding doesn't exist, the patch adds it to the status.
|
||||
// If the binding is already up-to-date, the return value is empty.
|
||||
func (crs *ConfigResourceSyncer) updateBindingPatch(bindings []monitoringv1.WorkloadBinding, conditions []monitoringv1.ConfigResourceCondition) ([]byte, error) {
|
||||
i := crs.GetBindingIndex(bindings)
|
||||
if i < 0 {
|
||||
binding := crs.newBinding(conditions)
|
||||
if len(bindings) == 0 {
|
||||
// Initialize the workload bindings.
|
||||
return json.Marshal(patch{
|
||||
patchOperation{
|
||||
Op: "add",
|
||||
Path: "/status",
|
||||
Value: monitoringv1.ConfigResourceStatus{
|
||||
Bindings: []monitoringv1.WorkloadBinding{binding},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Append the workload binding.
|
||||
return json.Marshal(patch{
|
||||
patchOperation{
|
||||
Op: "add",
|
||||
Path: "/status/bindings/-",
|
||||
Value: binding,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// No need to update the binding if the conditions haven't changed
|
||||
if equalConfigResourceConditions(bindings[i].Conditions, conditions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return json.Marshal(
|
||||
append(
|
||||
crs.testBindingExists(i),
|
||||
patchOperation{
|
||||
Op: "replace",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/conditions", i),
|
||||
Value: conditions,
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// removeBindingPatch returns a RFC-6902 JSON patch which removes the
|
||||
// workload binding from the resource's status.
|
||||
// If the binding doesn't exist, the return value is empty.
|
||||
func (crs *ConfigResourceSyncer) removeBindingPatch(bindings []monitoringv1.WorkloadBinding) ([]byte, error) {
|
||||
i := crs.GetBindingIndex(bindings)
|
||||
if i < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return json.Marshal(
|
||||
append(
|
||||
crs.testBindingExists(i),
|
||||
patchOperation{
|
||||
Op: "remove",
|
||||
Path: fmt.Sprintf("/status/bindings/%d", i),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
func (crs *ConfigResourceSyncer) testBindingExists(i int) patch {
|
||||
return []patchOperation{
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/name", i),
|
||||
Value: crs.workload.GetName(),
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/namespace", i),
|
||||
Value: crs.workload.GetNamespace(),
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/resource", i),
|
||||
Value: crs.gvr.Resource,
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/group", i),
|
||||
Value: crs.gvr.Group,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// equalConfigResourceConditions returns true when both slices are equal semantically.
|
||||
func equalConfigResourceConditions(a, b []monitoringv1.ConfigResourceCondition) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
|
||||
ac, bc := slices.Clone(a), slices.Clone(b)
|
||||
|
||||
slices.SortFunc(ac, func(a, b monitoringv1.ConfigResourceCondition) int {
|
||||
return cmp.Compare(a.Type, b.Type)
|
||||
})
|
||||
slices.SortFunc(bc, func(a, b monitoringv1.ConfigResourceCondition) int {
|
||||
return cmp.Compare(a.Type, b.Type)
|
||||
})
|
||||
|
||||
return slices.EqualFunc(ac, bc, func(a, b monitoringv1.ConfigResourceCondition) bool {
|
||||
return a.Type == b.Type &&
|
||||
a.Status == b.Status &&
|
||||
a.Reason == b.Reason &&
|
||||
a.Message == b.Message &&
|
||||
a.ObservedGeneration == b.ObservedGeneration
|
||||
})
|
||||
}
|
||||
@@ -15,12 +15,9 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -30,12 +27,9 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
|
||||
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/informers"
|
||||
"github.com/prometheus-operator/prometheus-operator/pkg/operator"
|
||||
)
|
||||
@@ -61,145 +55,6 @@ type StatusReporter struct {
|
||||
Rr *operator.ResourceReconciler
|
||||
}
|
||||
|
||||
type ConfigResourceSyncer struct {
|
||||
gvr schema.GroupVersionResource
|
||||
mclient monitoringclient.Interface
|
||||
workload metav1.Object // Workload resource (Prometheus and PrometheusAgent) selecting the configuration resources.
|
||||
}
|
||||
|
||||
func NewConfigResourceSyncer(gvr schema.GroupVersionResource, mclient monitoringclient.Interface, workload metav1.Object) *ConfigResourceSyncer {
|
||||
return &ConfigResourceSyncer{
|
||||
gvr: gvr,
|
||||
mclient: mclient,
|
||||
workload: workload,
|
||||
}
|
||||
}
|
||||
|
||||
type patch []patchOperation
|
||||
|
||||
type patchOperation struct {
|
||||
Op string `json:"op"`
|
||||
Path string `json:"path"`
|
||||
Value any `json:"value,omitempty"`
|
||||
}
|
||||
|
||||
// GetBindingIndex returns the index of the workload binding in the slice.
|
||||
// The return value is negative if there's no binding for the workload.
|
||||
func (crs *ConfigResourceSyncer) GetBindingIndex(bindings []monitoringv1.WorkloadBinding) int {
|
||||
for i, binding := range bindings {
|
||||
if binding.Namespace == crs.workload.GetNamespace() &&
|
||||
binding.Name == crs.workload.GetName() &&
|
||||
binding.Group == crs.gvr.Group &&
|
||||
binding.Resource == crs.gvr.Resource {
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
|
||||
func (crs *ConfigResourceSyncer) newBinding(conditions []monitoringv1.ConfigResourceCondition) monitoringv1.WorkloadBinding {
|
||||
return monitoringv1.WorkloadBinding{
|
||||
Namespace: crs.workload.GetNamespace(),
|
||||
Name: crs.workload.GetName(),
|
||||
Resource: crs.gvr.Resource,
|
||||
Group: crs.gvr.Group,
|
||||
Conditions: conditions,
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateBindingPatch returns a RFC-6902 JSON patch which updates the
|
||||
// conditions of the resource's status.
|
||||
// If the binding doesn't exist, the patch adds it to the status.
|
||||
// If the binding is already up-to-date, the return value is empty.
|
||||
func (crs *ConfigResourceSyncer) UpdateBindingPatch(bindings []monitoringv1.WorkloadBinding, conditions []monitoringv1.ConfigResourceCondition) ([]byte, error) {
|
||||
i := crs.GetBindingIndex(bindings)
|
||||
if i < 0 {
|
||||
binding := crs.newBinding(conditions)
|
||||
if len(bindings) == 0 {
|
||||
// Initialize the workload bindings.
|
||||
return json.Marshal(patch{
|
||||
patchOperation{
|
||||
Op: "add",
|
||||
Path: "/status",
|
||||
Value: monitoringv1.ConfigResourceStatus{
|
||||
Bindings: []monitoringv1.WorkloadBinding{binding},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Append the workload binding.
|
||||
return json.Marshal(patch{
|
||||
patchOperation{
|
||||
Op: "add",
|
||||
Path: "/status/bindings/-",
|
||||
Value: binding,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// No need to update the binding if the conditions haven't changed
|
||||
if equalConfigResourceConditions(bindings[i].Conditions, conditions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return json.Marshal(
|
||||
append(
|
||||
crs.testBindingExists(i),
|
||||
patchOperation{
|
||||
Op: "replace",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/conditions", i),
|
||||
Value: conditions,
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// RemoveBindingPatch returns a RFC-6902 JSON patch which removes the
|
||||
// workload binding from the resource's status.
|
||||
// If the binding doesn't exist, the return value is empty.
|
||||
func (crs *ConfigResourceSyncer) RemoveBindingPatch(bindings []monitoringv1.WorkloadBinding) ([]byte, error) {
|
||||
i := crs.GetBindingIndex(bindings)
|
||||
if i < 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return json.Marshal(
|
||||
append(
|
||||
crs.testBindingExists(i),
|
||||
patchOperation{
|
||||
Op: "remove",
|
||||
Path: fmt.Sprintf("/status/bindings/%d", i),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
func (crs *ConfigResourceSyncer) testBindingExists(i int) patch {
|
||||
return []patchOperation{
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/name", i),
|
||||
Value: crs.workload.GetName(),
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/namespace", i),
|
||||
Value: crs.workload.GetNamespace(),
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/resource", i),
|
||||
Value: crs.gvr.Resource,
|
||||
},
|
||||
{
|
||||
Op: "test",
|
||||
Path: fmt.Sprintf("/status/bindings/%d/group", i),
|
||||
Value: crs.gvr.Group,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func KeyToStatefulSetKey(p monitoringv1.PrometheusInterface, key string, shard int) string {
|
||||
keyParts := strings.Split(key, "/")
|
||||
return fmt.Sprintf("%s/%s", keyParts[0], statefulSetNameFromPrometheusName(p, keyParts[1], shard))
|
||||
@@ -394,89 +249,3 @@ func (sr *StatusReporter) Process(ctx context.Context, p monitoringv1.Prometheus
|
||||
|
||||
return &pStatus, nil
|
||||
}
|
||||
|
||||
// UpdateServiceMonitorStatus updates the status binding of the serviceMonitor
|
||||
// for the given workload.
|
||||
func UpdateServiceMonitorStatus(
|
||||
ctx context.Context,
|
||||
c *ConfigResourceSyncer,
|
||||
res TypedConfigurationResource[*monitoringv1.ServiceMonitor]) error {
|
||||
smon := res.resource
|
||||
|
||||
p, err := c.UpdateBindingPatch(smon.Status.Bindings, res.conditions(smon.Generation))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = c.mclient.MonitoringV1().ServiceMonitors(smon.Namespace).Patch(
|
||||
ctx,
|
||||
smon.Name,
|
||||
types.JSONPatchType,
|
||||
p,
|
||||
metav1.PatchOptions{
|
||||
FieldManager: operator.PrometheusOperatorFieldManager,
|
||||
FieldValidation: metav1.FieldValidationStrict,
|
||||
},
|
||||
statusSubResource,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// equalConfigResourceConditions returns true when both slices are equal semantically.
|
||||
func equalConfigResourceConditions(a, b []monitoringv1.ConfigResourceCondition) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
|
||||
ac, bc := slices.Clone(a), slices.Clone(b)
|
||||
|
||||
slices.SortFunc(ac, func(a, b monitoringv1.ConfigResourceCondition) int {
|
||||
return cmp.Compare(a.Type, b.Type)
|
||||
})
|
||||
slices.SortFunc(bc, func(a, b monitoringv1.ConfigResourceCondition) int {
|
||||
return cmp.Compare(a.Type, b.Type)
|
||||
})
|
||||
|
||||
return slices.EqualFunc(ac, bc, func(a, b monitoringv1.ConfigResourceCondition) bool {
|
||||
return a.Type == b.Type &&
|
||||
a.Status == b.Status &&
|
||||
a.Reason == b.Reason &&
|
||||
a.Message == b.Message &&
|
||||
a.ObservedGeneration == b.ObservedGeneration
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveServiceMonitorBinding removes the Prometheus or PrometheusAgent
|
||||
// binding from the ServiceMonitor's status subresource.
|
||||
// If the workload has no binding, this a no-operation.
|
||||
func RemoveServiceMonitorBinding(
|
||||
ctx context.Context,
|
||||
c *ConfigResourceSyncer,
|
||||
smon *monitoringv1.ServiceMonitor) error {
|
||||
p, err := c.RemoveBindingPatch(smon.Status.Bindings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(p) == 0 {
|
||||
// Binding not found.
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = c.mclient.MonitoringV1().ServiceMonitors(smon.Namespace).Patch(
|
||||
ctx,
|
||||
smon.Name,
|
||||
types.JSONPatchType,
|
||||
p,
|
||||
metav1.PatchOptions{
|
||||
FieldManager: operator.PrometheusOperatorFieldManager,
|
||||
FieldValidation: metav1.FieldValidationStrict,
|
||||
},
|
||||
statusSubResource,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -80,7 +80,12 @@ type TypedConfigurationResource[T ConfigurationResource] struct {
|
||||
reason string // Reason for rejection; empty if accepted.
|
||||
}
|
||||
|
||||
func (r *TypedConfigurationResource[T]) conditions(observedGeneration int64) []monitoringv1.ConfigResourceCondition {
|
||||
func (r *TypedConfigurationResource[T]) Resource() T {
|
||||
return r.resource
|
||||
}
|
||||
|
||||
// Conditions returns a list of conditions based on the validation status of the configuration resource.
|
||||
func (r *TypedConfigurationResource[T]) Conditions(observedGeneration int64) []monitoringv1.ConfigResourceCondition {
|
||||
condition := monitoringv1.ConfigResourceCondition{
|
||||
Type: monitoringv1.Accepted,
|
||||
Status: monitoringv1.ConditionTrue,
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -62,6 +63,7 @@ const (
|
||||
// monitoring configurations.
|
||||
type Operator struct {
|
||||
kclient kubernetes.Interface
|
||||
dclient dynamic.Interface
|
||||
mdClient metadata.Interface
|
||||
mclient monitoringclient.Interface
|
||||
|
||||
@@ -151,6 +153,11 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
|
||||
return nil, fmt.Errorf("instantiating kubernetes client failed: %w", err)
|
||||
}
|
||||
|
||||
dclient, err := dynamic.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("instantiating dynamic client failed: %w", err)
|
||||
}
|
||||
|
||||
mdClient, err := metadata.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("instantiating metadata client failed: %w", err)
|
||||
@@ -166,6 +173,7 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
|
||||
|
||||
o := &Operator{
|
||||
kclient: client,
|
||||
dclient: dclient,
|
||||
mdClient: mdClient,
|
||||
mclient: mclient,
|
||||
logger: logger,
|
||||
@@ -1047,9 +1055,11 @@ func (c *Operator) updateConfigResourcesStatus(ctx context.Context, p *monitorin
|
||||
return nil
|
||||
}
|
||||
|
||||
configResourceSyncer := prompkg.NewConfigResourceSyncer(monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusName), c.mclient, p)
|
||||
for key, sm := range resources.sMons {
|
||||
if err := prompkg.UpdateServiceMonitorStatus(ctx, configResourceSyncer, sm); err != nil {
|
||||
configResourceSyncer := prompkg.NewConfigResourceSyncer(p, c.dclient)
|
||||
for key, configResource := range resources.sMons {
|
||||
smon := configResource.Resource()
|
||||
conditions := configResource.Conditions(smon.Generation)
|
||||
if err := configResourceSyncer.UpdateBinding(ctx, smon, smon.Status.Bindings, conditions); err != nil {
|
||||
return fmt.Errorf("failed to update ServiceMonitor %s status: %w", key, err)
|
||||
}
|
||||
}
|
||||
@@ -1068,13 +1078,21 @@ func (c *Operator) updateConfigResourcesStatus(ctx context.Context, p *monitorin
|
||||
return
|
||||
}
|
||||
|
||||
_, ok = resources.sMons[k]
|
||||
if ok {
|
||||
if _, ok = resources.sMons[k]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
s := obj.(*monitoringv1.ServiceMonitor)
|
||||
if err := prompkg.RemoveServiceMonitorBinding(ctx, configResourceSyncer, s); err != nil {
|
||||
s, ok := obj.(*monitoringv1.ServiceMonitor)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err := k8sutil.AddTypeInformationToObject(s); err != nil {
|
||||
getErr = fmt.Errorf("failed to add type information to ServiceMonitor %s: %w", k, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := configResourceSyncer.RemoveBinding(ctx, s, s.Status.Bindings); err != nil {
|
||||
getErr = fmt.Errorf("failed to remove Prometheus binding from ServiceMonitor %s status: %w", k, err)
|
||||
}
|
||||
}); err != nil {
|
||||
@@ -1088,20 +1106,32 @@ func (c *Operator) configResStatusCleanup(ctx context.Context, p *monitoringv1.P
|
||||
if !c.configResourcesStatusEnabled {
|
||||
return nil
|
||||
}
|
||||
configResourceSyncer := prompkg.NewConfigResourceSyncer(monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PrometheusName), c.mclient, p)
|
||||
|
||||
var getErr error
|
||||
var (
|
||||
configResourceSyncer = prompkg.NewConfigResourceSyncer(p, c.dclient)
|
||||
getErr error
|
||||
)
|
||||
if err := c.smonInfs.ListAll(labels.Everything(), func(obj any) {
|
||||
if getErr != nil {
|
||||
// Skip all subsequent updates after the first error.
|
||||
return
|
||||
}
|
||||
|
||||
s := obj.(*monitoringv1.ServiceMonitor)
|
||||
getErr = prompkg.RemoveServiceMonitorBinding(ctx, configResourceSyncer, s)
|
||||
s, ok := obj.(*monitoringv1.ServiceMonitor)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err := k8sutil.AddTypeInformationToObject(s); err != nil {
|
||||
getErr = fmt.Errorf("failed to add type information to ServiceMonitor: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
getErr = configResourceSyncer.RemoveBinding(ctx, s, s.Status.Bindings)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("listing all ServiceMonitors from cache failed: %w", err)
|
||||
}
|
||||
|
||||
return getErr
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user