1
0
mirror of https://github.com/coreos/prometheus-operator.git synced 2026-02-05 15:46:31 +01:00

chore: simplify owner lookup (#6938)

* chore: simplify owner lookup

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

---------

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier
2024-09-17 08:52:44 +02:00
committed by GitHub
parent ac6c7399a0
commit bb244317df
7 changed files with 81 additions and 311 deletions

View File

@@ -21,7 +21,6 @@ import (
"fmt"
"log/slog"
"path"
"regexp"
"strings"
"time"
@@ -165,19 +164,20 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
opt(o)
}
if err := o.bootstrap(ctx, c); err != nil {
return nil, err
}
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.alrtInfs,
o.metrics,
monitoringv1.AlertmanagersKind,
r,
o.controllerID,
)
if err := o.bootstrap(ctx, c); err != nil {
return nil, err
}
return o, nil
}
@@ -455,47 +455,6 @@ func (c *Operator) RefreshStatusFor(o metav1.Object) {
c.rr.EnqueueForStatus(o)
}
// Resolve implements the operator.Syncer interface.
func (c *Operator) Resolve(obj interface{}) metav1.Object {
ss := obj.(*appsv1.StatefulSet)
key, ok := c.accessor.MetaNamespaceKey(ss)
if !ok {
return nil
}
match, aKey := statefulSetKeyToAlertmanagerKey(key)
if !match {
c.logger.Debug("StatefulSet key did not match an Alertmanager key format", "key", key)
return nil
}
a, err := c.alrtInfs.Get(aKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
c.logger.Error("Alertmanager lookup failed", "err", err)
return nil
}
return a.(*monitoringv1.Alertmanager)
}
func statefulSetKeyToAlertmanagerKey(key string) (bool, string) {
r := regexp.MustCompile("^(.+)/alertmanager-(.+)$")
matches := r.FindAllStringSubmatch(key, 2)
if len(matches) != 1 {
return false, ""
}
if len(matches[0]) != 3 {
return false, ""
}
return true, matches[0][1] + "/" + matches[0][2]
}
func alertmanagerKeyToStatefulSetKey(key string) string {
keyParts := strings.Split(key, "/")
return keyParts[0] + "/alertmanager-" + keyParts[1]

View File

@@ -26,12 +26,16 @@ import (
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
)
// Syncer knows how to synchronize statefulset-based or daemonset-based resources.
@@ -40,8 +44,11 @@ type Syncer interface {
Sync(context.Context, string) error
// UpdateStatus updates the status of the object identified by its key.
UpdateStatus(context.Context, string) error
// Resolve returns the resource owning the workload object (either StatefulSet or DaemonSet).
Resolve(obj interface{}) metav1.Object
}
// ResourceGetter returns an object from its "<namespace>/<name>" key.
type ResourceGetter interface {
Get(string) (runtime.Object, error)
}
// ReconcilerMetrics tracks reconciler metrics.
@@ -69,6 +76,7 @@ type ResourceReconciler struct {
resourceKind string
syncer Syncer
getter ResourceGetter
reconcileTotal prometheus.Counter
reconcileErrors prometheus.Counter
@@ -100,6 +108,7 @@ const (
func NewResourceReconciler(
l *slog.Logger,
syncer Syncer,
getter ResourceGetter,
metrics ReconcilerMetrics,
kind string,
reg prometheus.Registerer,
@@ -146,6 +155,7 @@ func NewResourceReconciler(
logger: l,
resourceKind: kind,
syncer: syncer,
getter: getter,
reconcileTotal: reconcileTotal,
reconcileErrors: reconcileErrors,
@@ -232,6 +242,34 @@ func (rr *ResourceReconciler) objectKey(obj interface{}) (string, bool) {
return k, true
}
func (rr *ResourceReconciler) resolve(obj metav1.Object) metav1.Object {
for _, or := range obj.GetOwnerReferences() {
if !ptr.Deref(or.Controller, false) {
continue
}
owner, err := rr.getter.Get(types.NamespacedName{Namespace: obj.GetNamespace(), Name: or.Name}.String())
if err != nil {
if !apierrors.IsNotFound(err) {
rr.logger.Error("can't resolve owner", "err", err)
}
return nil
}
owner = owner.DeepCopyObject()
o, err := meta.Accessor(owner)
if err != nil {
rr.logger.Error("can't get owner meta", "gvk", owner.GetObjectKind().GroupVersionKind().String(), "err", err)
}
return o
}
rr.logger.Error("can't find controller owner")
return nil
}
// OnAdd implements the cache.ResourceEventHandler interface.
func (rr *ResourceReconciler) OnAdd(obj interface{}, _ bool) {
@@ -340,7 +378,7 @@ func (rr *ResourceReconciler) OnDelete(obj interface{}) {
}
func (rr *ResourceReconciler) onStatefulSetAdd(ss *appsv1.StatefulSet) {
obj := rr.syncer.Resolve(ss)
obj := rr.resolve(ss)
if obj == nil {
return
}
@@ -352,7 +390,7 @@ func (rr *ResourceReconciler) onStatefulSetAdd(ss *appsv1.StatefulSet) {
}
func (rr *ResourceReconciler) onDaemonSetAdd(ds *appsv1.DaemonSet) {
obj := rr.syncer.Resolve(ds)
obj := rr.resolve(ds)
if obj == nil {
return
}
@@ -373,7 +411,7 @@ func (rr *ResourceReconciler) onStatefulSetUpdate(old, cur *appsv1.StatefulSet)
return
}
obj := rr.syncer.Resolve(cur)
obj := rr.resolve(cur)
if obj == nil {
return
}
@@ -403,7 +441,7 @@ func (rr *ResourceReconciler) onDaemonSetUpdate(old, cur *appsv1.DaemonSet) {
return
}
obj := rr.syncer.Resolve(cur)
obj := rr.resolve(cur)
if obj == nil {
return
}
@@ -422,7 +460,7 @@ func (rr *ResourceReconciler) onDaemonSetUpdate(old, cur *appsv1.DaemonSet) {
}
func (rr *ResourceReconciler) onStatefulSetDelete(ss *appsv1.StatefulSet) {
obj := rr.syncer.Resolve(ss)
obj := rr.resolve(ss)
if obj == nil {
return
}
@@ -434,7 +472,7 @@ func (rr *ResourceReconciler) onStatefulSetDelete(ss *appsv1.StatefulSet) {
}
func (rr *ResourceReconciler) onDaemonSetDelete(ds *appsv1.DaemonSet) {
obj := rr.syncer.Resolve(ds)
obj := rr.resolve(ds)
if obj == nil {
return
}

View File

@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"log/slog"
"regexp"
"strings"
"time"
@@ -54,9 +53,6 @@ const (
controllerName = "prometheusagent-controller"
)
var prometheusAgentKeyInShardStatefulSet = regexp.MustCompile("^(.+)/prom-agent-(.+)-shard-[1-9][0-9]*$")
var prometheusAgentKey = regexp.MustCompile("^(.+)/prom-agent-(.+)$")
// Operator manages life cycle of Prometheus agent deployments and
// monitoring configurations.
type Operator struct {
@@ -172,15 +168,6 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
opt(o)
}
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.metrics,
monitoringv1alpha1.PrometheusAgentsKind,
r,
o.controllerID,
)
o.promInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.Namespaces.PrometheusAllowList,
@@ -201,9 +188,18 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
for _, informer := range o.promInfs.GetInformers() {
promStores = append(promStores, informer.Informer().GetStore())
}
o.metrics.MustRegister(prompkg.NewCollectorForStores(promStores...))
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.promInfs,
o.metrics,
monitoringv1alpha1.PrometheusAgentsKind,
r,
o.controllerID,
)
o.smonInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.Namespaces.AllowList,
@@ -545,74 +541,6 @@ func (c *Operator) addHandlers() {
})
}
// Resolve implements the operator.Syncer interface.
func (c *Operator) Resolve(obj interface{}) metav1.Object {
key, ok := c.accessor.MetaNamespaceKey(obj)
if !ok {
return nil
}
var match bool
var promKey string
switch obj.(type) {
case *appsv1.DaemonSet:
match, promKey = daemonSetKeyToPrometheusAgentKey(key)
case *appsv1.StatefulSet:
match, promKey = statefulSetKeyToPrometheusAgentKey(key)
}
if !match {
c.logger.Debug("StatefulSet key did not match a Prometheus Agent key format", "key", key)
return nil
}
p, err := c.promInfs.Get(promKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
c.logger.Error("Prometheus lookup failed", "err", err)
return nil
}
return p.(*monitoringv1alpha1.PrometheusAgent)
}
// statefulSetKeyToPrometheusAgentKey checks if StatefulSet key can be converted to Prometheus Agent key
// and do the conversion if it's true. The case of Prometheus Agent key in sharding is also handled.
func statefulSetKeyToPrometheusAgentKey(key string) (bool, string) {
r := prometheusAgentKey
if prometheusAgentKeyInShardStatefulSet.MatchString(key) {
r = prometheusAgentKeyInShardStatefulSet
}
matches := r.FindAllStringSubmatch(key, 2)
if len(matches) != 1 {
return false, ""
}
if len(matches[0]) != 3 {
return false, ""
}
return true, matches[0][1] + "/" + matches[0][2]
}
// daemonSetKeyToPrometheusAgentKey checks if DaemonSet key can be converted to Prometheus Agent key
// and do the conversion if it's true.
func daemonSetKeyToPrometheusAgentKey(key string) (bool, string) {
r := prometheusAgentKey
matches := r.FindAllStringSubmatch(key, 2)
if len(matches) != 1 {
return false, ""
}
if len(matches[0]) != 3 {
return false, ""
}
return true, matches[0][1] + "/" + matches[0][2]
}
// Sync implements the operator.Syncer interface.
// TODO: Consider refactoring the common code between syncDaemonSet() and syncStatefulSet().
func (c *Operator) Sync(ctx context.Context, key string) error {

View File

@@ -1,51 +0,0 @@
// Copyright 2024 The prometheus-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheusagent
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestStatefulSetKeyToPrometheusAgentKey(t *testing.T) {
cases := []struct {
input string
expectedKey string
expectedMatch bool
}{
{
input: "namespace/prom-agent-test",
expectedKey: "namespace/test",
expectedMatch: true,
},
{
input: "namespace/prom-agent-test-shard-1",
expectedKey: "namespace/test",
expectedMatch: true,
},
{
input: "allns-z-thanosrulercreatedeletecluster-qcwdmj-0/thanos-ruler-test",
expectedKey: "",
expectedMatch: false,
},
}
for _, c := range cases {
match, key := statefulSetKeyToPrometheusAgentKey(c.input)
require.Equal(t, c.expectedKey, key)
require.Equal(t, c.expectedMatch, match)
}
}

View File

@@ -19,7 +19,6 @@ import (
"fmt"
"log/slog"
"reflect"
"regexp"
"strings"
"time"
@@ -55,9 +54,6 @@ const (
controllerName = "prometheus-controller"
)
var prometheusKeyInShardStatefulSet = regexp.MustCompile("^(.+)/prometheus-(.+)-shard-[1-9][0-9]*$")
var prometheusKeyInStatefulSet = regexp.MustCompile("^(.+)/prometheus-(.+)$")
// Operator manages life cycle of Prometheus deployments and
// monitoring configurations.
type Operator struct {
@@ -170,15 +166,6 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
o.metrics.MustRegister(o.reconciliations)
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.metrics,
monitoringv1.PrometheusesKind,
r,
o.controllerID,
)
o.promInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.Namespaces.PrometheusAllowList,
@@ -201,6 +188,16 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
}
o.metrics.MustRegister(prompkg.NewCollectorForStores(promStores...))
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.promInfs,
o.metrics,
monitoringv1.PrometheusesKind,
r,
o.controllerID,
)
o.smonInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.Namespaces.AllowList,
@@ -660,50 +657,6 @@ func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
}
// Resolve implements the operator.Syncer interface.
func (c *Operator) Resolve(obj interface{}) metav1.Object {
ss := obj.(*appsv1.StatefulSet)
key, ok := c.accessor.MetaNamespaceKey(ss)
if !ok {
return nil
}
match, promKey := statefulSetKeyToPrometheusKey(key)
if !match {
c.logger.Debug("StatefulSet key did not match a Prometheus key format", "key", key)
return nil
}
p, err := c.promInfs.Get(promKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
c.logger.Error("Prometheus lookup failed", "err", err)
return nil
}
return p.(*monitoringv1.Prometheus)
}
func statefulSetKeyToPrometheusKey(key string) (bool, string) {
r := prometheusKeyInStatefulSet
if prometheusKeyInShardStatefulSet.MatchString(key) {
r = prometheusKeyInShardStatefulSet
}
matches := r.FindAllStringSubmatch(key, 2)
if len(matches) != 1 {
return false, ""
}
if len(matches[0]) != 3 {
return false, ""
}
return true, matches[0][1] + "/" + matches[0][2]
}
func (c *Operator) handleMonitorNamespaceUpdate(oldo, curo interface{}) {
old := oldo.(*v1.Namespace)
cur := curo.(*v1.Namespace)

View File

@@ -227,33 +227,3 @@ func TestCreateStatefulSetInputHash(t *testing.T) {
})
}
}
func TestStatefulSetKeyToPrometheusKey(t *testing.T) {
cases := []struct {
input string
expectedKey string
expectedMatch bool
}{
{
input: "namespace/prometheus-test",
expectedKey: "namespace/test",
expectedMatch: true,
},
{
input: "namespace/prometheus-test-shard-1",
expectedKey: "namespace/test",
expectedMatch: true,
},
{
input: "allns-z-thanosrulercreatedeletecluster-qcwdmj-0/thanos-ruler-test",
expectedKey: "",
expectedMatch: false,
},
}
for _, c := range cases {
match, key := statefulSetKeyToPrometheusKey(c.input)
require.Equal(t, c.expectedKey, key)
require.Equal(t, c.expectedMatch, match)
}
}

View File

@@ -148,15 +148,6 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
opt(o)
}
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.metrics,
monitoringv1.ThanosRulerKind,
r,
o.controllerID,
)
o.cmapInfs, err = informers.NewInformersForResource(
informers.NewMetadataInformerFactory(
c.Namespaces.ThanosRulerAllowList,
@@ -195,6 +186,16 @@ func New(ctx context.Context, restConfig *rest.Config, c operator.Config, logger
}
o.metrics.MustRegister(newThanosRulerCollectorForStores(thanosStores...))
o.rr = operator.NewResourceReconciler(
o.logger,
o,
o.thanosRulerInfs,
o.metrics,
monitoringv1.ThanosRulerKind,
r,
o.controllerID,
)
o.ruleInfs, err = informers.NewInformersForResource(
informers.NewMonitoringInformerFactories(
c.Namespaces.AllowList,
@@ -374,34 +375,6 @@ func (o *Operator) RefreshStatusFor(obj metav1.Object) {
o.rr.EnqueueForStatus(obj)
}
// Resolve implements the operator.Syncer interface.
func (o *Operator) Resolve(obj interface{}) metav1.Object {
ss := obj.(*appsv1.StatefulSet)
key, ok := o.accessor.MetaNamespaceKey(ss)
if !ok {
return nil
}
thanosKey := statefulSetKeyToThanosKey(key)
tr, err := o.thanosRulerInfs.Get(thanosKey)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
o.logger.Error("ThanosRuler lookup failed", "err", err)
return nil
}
return tr.(*monitoringv1.ThanosRuler)
}
func statefulSetKeyToThanosKey(key string) string {
keyParts := strings.Split(key, "/")
return keyParts[0] + "/" + strings.TrimPrefix(keyParts[1], "thanos-ruler-")
}
func thanosKeyToStatefulSetKey(key string) string {
keyParts := strings.Split(key, "/")
return keyParts[0] + "/thanos-ruler-" + keyParts[1]