diff --git a/pkg/informers/informers.go b/pkg/informers/informers.go index 80b8dd23e..a44a08dd7 100644 --- a/pkg/informers/informers.go +++ b/pkg/informers/informers.go @@ -89,10 +89,18 @@ func NewInformersForResourceWithTransform(ifs FactoriesForNamespaces, resource s // * ManagedFields // * Finalizers // * OwnerReferences. +// +// If the passed object isn't of type *v1.PartialObjectMetadata, it is returned unmodified. +// +// It matches the cache.TransformFunc type and can be used by informers +// watching PartialObjectMetadata objects to reduce memory consumption. +// See https://pkg.go.dev/k8s.io/client-go@v0.29.1/tools/cache#TransformFunc for details. func PartialObjectMetadataStrip(obj interface{}) (interface{}, error) { partialMeta, ok := obj.(*v1.PartialObjectMetadata) if !ok { - return nil, fmt.Errorf("internal error: cannot cast object %#+v to PartialObjectMetadata", obj) + // Don't do anything if the cast isn't successful. + // The object might be of type "cache.DeletedFinalStateUnknown". + return obj, nil } partialMeta.Annotations = nil diff --git a/pkg/informers/informers_test.go b/pkg/informers/informers_test.go index efd6330df..edb32cd7a 100644 --- a/pkg/informers/informers_test.go +++ b/pkg/informers/informers_test.go @@ -15,18 +15,25 @@ package informers import ( + "context" "reflect" "sort" "strings" + "sync/atomic" "testing" + "time" - "k8s.io/apimachinery/pkg/api/errors" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/metadata/fake" + kubetesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -46,7 +53,7 @@ func (m *mockFactory) Get(name string) (runtime.Object, error) { return obj, nil } - return nil, errors.NewNotFound(schema.GroupResource{}, name) + return nil, apierrors.NewNotFound(schema.GroupResource{}, name) } func (m *mockFactory) ByNamespace(_ string) cache.GenericNamespaceLister { @@ -96,7 +103,7 @@ func TestInformers(t *testing.T) { } _, err = ifs.Get("bar") - if !errors.IsNotFound(err) { + if !apierrors.IsNotFound(err) { t.Errorf("expected IsNotFound error, got %v", err) return } @@ -107,14 +114,14 @@ func TestNewInformerOptions(t *testing.T) { for _, tc := range []struct { name string allowedNamespaces, deniedNamespaces map[string]struct{} - tweaks func(*v1.ListOptions) + tweaks func(*metav1.ListOptions) - expectedOptions v1.ListOptions + expectedOptions metav1.ListOptions expectedNamespaces []string }{ { name: "all unset", - expectedOptions: v1.ListOptions{}, + expectedOptions: metav1.ListOptions{}, expectedNamespaces: nil, }, { @@ -123,7 +130,7 @@ func TestNewInformerOptions(t *testing.T) { "foo": {}, "bar": {}, }, - expectedOptions: v1.ListOptions{}, + expectedOptions: metav1.ListOptions{}, expectedNamespaces: []string{ "foo", "bar", @@ -135,11 +142,11 @@ func TestNewInformerOptions(t *testing.T) { "foo": {}, "bar": {}, }, - tweaks: func(options *v1.ListOptions) { + tweaks: func(options *metav1.ListOptions) { options.FieldSelector = "metadata.name=foo" }, - expectedOptions: v1.ListOptions{ + expectedOptions: metav1.ListOptions{ FieldSelector: "metadata.name=foo", }, expectedNamespaces: []string{ @@ -158,7 +165,7 @@ func TestNewInformerOptions(t *testing.T) { "denied2": {}, }, - expectedOptions: v1.ListOptions{}, + expectedOptions: metav1.ListOptions{}, expectedNamespaces: []string{ "foo", "bar", @@ -174,7 +181,7 @@ func TestNewInformerOptions(t *testing.T) { "denied2": {}, }, - expectedOptions: v1.ListOptions{}, + expectedOptions: metav1.ListOptions{}, expectedNamespaces: []string{ "foo", }, @@ -182,7 +189,7 @@ func TestNewInformerOptions(t *testing.T) { { name: "all allowed namespaces denying namespaces", allowedNamespaces: map[string]struct{}{ - v1.NamespaceAll: {}, + metav1.NamespaceAll: {}, }, deniedNamespaces: map[string]struct{}{ "denied2": {}, @@ -190,40 +197,40 @@ func TestNewInformerOptions(t *testing.T) { }, expectedNamespaces: []string{ - v1.NamespaceAll, + metav1.NamespaceAll, }, - expectedOptions: v1.ListOptions{ + expectedOptions: metav1.ListOptions{ FieldSelector: "metadata.namespace!=denied1,metadata.namespace!=denied2", }, }, { name: "denied namespaces with tweak", allowedNamespaces: map[string]struct{}{ - v1.NamespaceAll: {}, + metav1.NamespaceAll: {}, }, deniedNamespaces: map[string]struct{}{ "denied2": {}, "denied1": {}, }, - tweaks: func(options *v1.ListOptions) { + tweaks: func(options *metav1.ListOptions) { options.FieldSelector = "metadata.name=foo" }, expectedNamespaces: []string{ - v1.NamespaceAll, + metav1.NamespaceAll, }, - expectedOptions: v1.ListOptions{ + expectedOptions: metav1.ListOptions{ FieldSelector: "metadata.name=foo,metadata.namespace!=denied1,metadata.namespace!=denied2", }, }, } { t.Run(tc.name, func(t *testing.T) { tweaks, namespaces := newInformerOptions(tc.allowedNamespaces, tc.deniedNamespaces, tc.tweaks) - opts := v1.ListOptions{} + opts := metav1.ListOptions{} tweaks(&opts) // sort the field selector as entries are in non-deterministic order - sortFieldSelector := func(opts *v1.ListOptions) { + sortFieldSelector := func(opts *metav1.ListOptions) { fs := strings.Split(opts.FieldSelector, ",") sort.Strings(fs) opts.FieldSelector = strings.Join(fs, ",") @@ -245,3 +252,104 @@ func TestNewInformerOptions(t *testing.T) { }) } } + +// TestPartialObjectMetadataStripOnDeletedFinalStateUnknown makes sure +// that PartialObjectMetadataStrip doesn't fail on DeletedFinalStateUnknown. +func TestPartialObjectMetadataStripOnDeletedFinalStateUnknown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + // Mock the following scenario: + // 1. the informer lists the secrets and the API returns 1 item. + // 2. the informer watches the secrets resource and the API returns a watch error. + // 3. the informer lists again the secrets and the API returns no item. + // + // After the third step, the informer should send a delete event with a + // "cache.DeletedFinalStateUnknown" object. + fakeClient := fake.NewSimpleMetadataClient(fake.NewTestScheme()) + listCalls, watchCalls := &atomic.Uint64{}, &atomic.Uint64{} + fakeClient.PrependReactor("list", "secrets", func(action kubetesting.Action) (bool, runtime.Object, error) { + objects := &metav1.List{ + Items: []runtime.RawExtension{}, + } + + // The first call to list returns 1 item. Subsequent calls returns an empty list. + if listCalls.Load() == 0 { + objects.Items = []runtime.RawExtension{ + {Object: &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "777", + }, + }}, + } + } + listCalls.Add(1) + + return true, objects, nil + }) + + fakeClient.PrependWatchReactor("secrets", func(action kubetesting.Action) (handled bool, ret watch.Interface, err error) { + w := watch.NewRaceFreeFake() + + // Trigger a watch error after the first list operation. + if listCalls.Load() == 1 { + w.Error(&apierrors.NewResourceExpired("expired").ErrStatus) + } + + watchCalls.Add(1) + return true, w, nil + }) + + infs, err := NewInformersForResourceWithTransform( + NewMetadataInformerFactory( + map[string]struct{}{"bar": {}}, + map[string]struct{}{}, + fakeClient, + time.Second, + nil, + ), + appsv1.SchemeGroupVersion.WithResource("secrets"), + PartialObjectMetadataStrip, + ) + require.NoError(t, err) + + var ( + addCount = &atomic.Uint64{} + delReceived = make(chan struct{}) + ) + infs.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + t.Logf("added object %T", obj) + addCount.Add(1) + }, + DeleteFunc: func(obj interface{}) { + t.Logf("deleted object %T", obj) + close(delReceived) + }, + }) + + errCh := make(chan error, 1) + for _, inf := range infs.informers { + inf.Informer().SetWatchErrorHandler(func(r *cache.Reflector, err error) { + errCh <- err + }) + } + + go infs.Start(ctx.Done()) + + select { + case <-delReceived: + case err = <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + require.FailNow(t, "timeout waiting for the delete event") + } + + // List should be called twice. + require.Equal(t, uint64(2), listCalls.Load()) + + // Watch should be called at least once. + require.GreaterOrEqual(t, watchCalls.Load(), uint64(1)) + // 1 object should have been added. + require.Equal(t, uint64(1), addCount.Load()) +}