diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 2338d791b..cd0f8fd60 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -1,4 +1,4 @@ -// Copyright 2015 Prometheus Team +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -143,6 +143,7 @@ func run() int { maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int() maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() + perAlertNameLimit = kingpin.Flag("alerts.per-alertname-limit", "Maximum number of alerts per alertname. If negative or zero, no limit is set.").Default("0").Int() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() @@ -345,7 +346,16 @@ func run() int { go peer.Settle(ctx, *gossipInterval*10) } - alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer) + alerts, err := mem.NewAlerts( + context.Background(), + marker, + *alertGCInterval, + *perAlertNameLimit, + nil, + logger, + prometheus.DefaultRegisterer, + ff, + ) if err != nil { logger.Error("error creating memory provider", "err", err) return 1 diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 6b24687b2..b3c770ce4 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -387,7 +387,7 @@ route: route := NewRoute(conf.Route, nil) reg := prometheus.NewRegistry() marker := types.NewMarker(reg) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil) if err != nil { t.Fatal(err) } @@ -538,7 +538,7 @@ route: route := NewRoute(conf.Route, nil) reg := prometheus.NewRegistry() marker := types.NewMarker(reg) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil) if err != nil { t.Fatal(err) } @@ -660,7 +660,7 @@ func TestDispatcherRace(t *testing.T) { logger := promslog.NewNopLogger() reg := prometheus.NewRegistry() marker := types.NewMarker(reg) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil) if err != nil { t.Fatal(err) } @@ -678,7 +678,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) logger := promslog.NewNopLogger() reg := prometheus.NewRegistry() marker := types.NewMarker(reg) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil) if err != nil { t.Fatal(err) } @@ -732,7 +732,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) { r := prometheus.NewRegistry() marker := types.NewMarker(r) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), r) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, 0, nil, promslog.NewNopLogger(), r, nil) if err != nil { t.Fatal(err) } @@ -971,7 +971,7 @@ func TestDispatchOnStartup(t *testing.T) { logger := promslog.NewNopLogger() reg := prometheus.NewRegistry() marker := types.NewMarker(reg) - alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil) if err != nil { t.Fatal(err) } diff --git a/docs/alertmanager.md b/docs/alertmanager.md index 10dfe1c07..9da4ed4e4 100644 --- a/docs/alertmanager.md +++ b/docs/alertmanager.md @@ -72,3 +72,17 @@ Alertmanager supports configuration to create a cluster for high availability. This can be configured using the [--cluster-*](https://github.com/prometheus/alertmanager#high-availability) flags. It's important not to load balance traffic between Prometheus and its Alertmanagers, but instead, point Prometheus to a list of all Alertmanagers. + +## Alert limits (optional) + +Alertmanager supports configuration to limit the number of active alerts per alertname. +This can be configured using the [--alerts.per-alertname-limit] flag. + +When the limit is reached any new alerts are dropped, heartbeats from already know alerts are processed. +The known alert (fingerprint) automatically expire to make room for new alerts. + +This feature is useful when an unexpected high number of instances of the same alert are sent to Alertmanager. +Limiting the number of alerts per alertname can prevent reliability issues and avoid alert receivers from being flooded. + +The `alertmanager_alerts_limited_total` metric shows the total number of alerts that were dropped due to per alert name limit. +Enabling the `alert-names-in-metrics` feature flag will add the `alertname` label to the metric. \ No newline at end of file diff --git a/featurecontrol/featurecontrol.go b/featurecontrol/featurecontrol.go index b4b7cf473..63e1056a2 100644 --- a/featurecontrol/featurecontrol.go +++ b/featurecontrol/featurecontrol.go @@ -21,6 +21,7 @@ import ( ) const ( + FeatureAlertNamesInMetrics = "alert-names-in-metrics" FeatureReceiverNameInMetrics = "receiver-name-in-metrics" FeatureClassicMode = "classic-mode" FeatureUTF8StrictMode = "utf8-strict-mode" @@ -29,6 +30,7 @@ const ( ) var AllowedFlags = []string{ + FeatureAlertNamesInMetrics, FeatureReceiverNameInMetrics, FeatureClassicMode, FeatureUTF8StrictMode, @@ -37,6 +39,7 @@ var AllowedFlags = []string{ } type Flagger interface { + EnableAlertNamesInMetrics() bool EnableReceiverNamesInMetrics() bool ClassicMode() bool UTF8StrictMode() bool @@ -46,6 +49,7 @@ type Flagger interface { type Flags struct { logger *slog.Logger + enableAlertNamesInMetrics bool enableReceiverNamesInMetrics bool classicMode bool utf8StrictMode bool @@ -53,6 +57,10 @@ type Flags struct { enableAutoGOMAXPROCS bool } +func (f *Flags) EnableAlertNamesInMetrics() bool { + return f.enableAlertNamesInMetrics +} + func (f *Flags) EnableReceiverNamesInMetrics() bool { return f.enableReceiverNamesInMetrics } @@ -105,6 +113,12 @@ func enableAutoGOMAXPROCS() flagOption { } } +func enableAlertNamesInMetrics() flagOption { + return func(configs *Flags) { + configs.enableAlertNamesInMetrics = true + } +} + func NewFlags(logger *slog.Logger, features string) (Flagger, error) { fc := &Flags{logger: logger} opts := []flagOption{} @@ -115,6 +129,9 @@ func NewFlags(logger *slog.Logger, features string) (Flagger, error) { for feature := range strings.SplitSeq(features, ",") { switch feature { + case FeatureAlertNamesInMetrics: + opts = append(opts, enableAlertNamesInMetrics()) + logger.Warn("Alert names in metrics enabled") case FeatureReceiverNameInMetrics: opts = append(opts, enableReceiverNameInMetrics()) logger.Warn("Experimental receiver name in metrics enabled") @@ -148,6 +165,8 @@ func NewFlags(logger *slog.Logger, features string) (Flagger, error) { type NoopFlags struct{} +func (n NoopFlags) EnableAlertNamesInMetrics() bool { return false } + func (n NoopFlags) EnableReceiverNamesInMetrics() bool { return false } func (n NoopFlags) ClassicMode() bool { return false } diff --git a/inhibit/inhibit_bench_test.go b/inhibit/inhibit_bench_test.go index 8f34443f7..f5fa536f4 100644 --- a/inhibit/inhibit_bench_test.go +++ b/inhibit/inhibit_bench_test.go @@ -184,7 +184,7 @@ func lastRuleMatchesBenchmark(b *testing.B, n int) benchmarkOptions { func benchmarkMutes(b *testing.B, opts benchmarkOptions) { r := prometheus.NewRegistry() m := types.NewMarker(r) - s, err := mem.NewAlerts(context.TODO(), m, time.Minute, nil, promslog.NewNopLogger(), r) + s, err := mem.NewAlerts(context.TODO(), m, time.Minute, 0, nil, promslog.NewNopLogger(), r, nil) if err != nil { b.Fatal(err) } diff --git a/limit/bucket.go b/limit/bucket.go new file mode 100644 index 000000000..bccc91bcc --- /dev/null +++ b/limit/bucket.go @@ -0,0 +1,160 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "container/heap" + "sync" + "time" +) + +// item represents a value and its priority based on time. +type item[V any] struct { + value V + priority time.Time + index int +} + +// expired returns true if the item is expired (priority is before the given time). +func (i *item[V]) expired(at time.Time) bool { + return i.priority.Before(at) +} + +// sortedItems is a heap of items. +type sortedItems[V any] []*item[V] + +// Len returns the number of items in the heap. +func (s sortedItems[V]) Len() int { return len(s) } + +// Less reports whether the element with index i should sort before the element with index j. +func (s sortedItems[V]) Less(i, j int) bool { return s[i].priority.Before(s[j].priority) } + +// Swap swaps the elements with indexes i and j. +func (s sortedItems[V]) Swap(i, j int) { + s[i], s[j] = s[j], s[i] + s[i].index = i + s[j].index = j +} + +// Push adds an item to the heap. +func (s *sortedItems[V]) Push(x any) { + n := len(*s) + item := x.(*item[V]) + item.index = n + *s = append(*s, item) +} + +// Pop removes and returns the minimum element (according to Less). +func (s *sortedItems[V]) Pop() any { + old := *s + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *s = old[0 : n-1] + return item +} + +// update modifies the priority and value of an item in the heap. +func (s *sortedItems[V]) update(item *item[V], priority time.Time) { + item.priority = priority + heap.Fix(s, item.index) +} + +// Bucket is a simple cache for values with priority(expiry). +// It has: +// - configurable capacity. +// - a mutex for thread safety. +// - a sorted heap of items for priority/expiry based eviction. +// - an index of items for fast updates. +type Bucket[V comparable] struct { + mtx sync.Mutex + index map[V]*item[V] + items sortedItems[V] + capacity int +} + +// NewBucket creates a new bucket with the given capacity. +// All internal data structures are initialized to the given capacity to avoid allocations during runtime. +func NewBucket[V comparable](capacity int) *Bucket[V] { + items := make(sortedItems[V], 0, capacity) + heap.Init(&items) + return &Bucket[V]{ + index: make(map[V]*item[V], capacity), + items: items, + capacity: capacity, + } +} + +// IsStale returns true if the latest item in the bucket is expired. +func (b *Bucket[V]) IsStale() (stale bool) { + b.mtx.Lock() + defer b.mtx.Unlock() + if b.items.Len() == 0 { + return true + } + + latest := b.items[b.items.Len()-1] + return latest.expired(time.Now()) +} + +// Upsert tries to add a new value and its priority to the bucket. +// If the value is already in the bucket, its priority is updated. +// If the bucket is not full, the new value is added. +// If the bucket is full, oldest expired item is evicted based on priority and the new value is added. +// Otherwise the new value is ignored and the method returns false. +func (b *Bucket[V]) Upsert(value V, priority time.Time) (ok bool) { + if b.capacity < 1 { + return false + } + + b.mtx.Lock() + defer b.mtx.Unlock() + + // If the value is already in the index, update it. + if item, exists := b.index[value]; exists { + b.items.update(item, priority) + return true + } + + // If the bucket is not full, add the new value to the heap and index. + if b.items.Len() < b.capacity { + item := &item[V]{ + value: value, + priority: priority, + } + b.index[value] = item + heap.Push(&b.items, item) + return true + } + + // If the bucket is full, check the oldest item (at heap root) and evict it if expired + oldest := b.items[0] + if oldest.expired(time.Now()) { + // Remove the expired item from both the heap and the index + heap.Pop(&b.items) + delete(b.index, oldest.value) + + // Add the new item + item := &item[V]{ + value: value, + priority: priority, + } + b.index[value] = item + heap.Push(&b.items, item) + return true + } + + return false +} diff --git a/limit/bucket_test.go b/limit/bucket_test.go new file mode 100644 index 000000000..93901aef2 --- /dev/null +++ b/limit/bucket_test.go @@ -0,0 +1,468 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestBucketUpsert(t *testing.T) { + testCases := []struct { + name string + bucketCapacity int + alerts []model.Alert + alertTimings []time.Time // When each alert is added relative to now + expectedResult []bool // Expected return value for each Add() call + description string + }{ + { + name: "Bucket with zero capacity should reject all alerts", + bucketCapacity: 0, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{false, false, false}, // All should be rejected + description: "Adding 3 alerts to a bucket with capacity 0 should fail", + }, + { + name: "Empty bucket should add items while not full", + bucketCapacity: 3, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{true, true, true}, // All should be added successfully + description: "Adding 3 alerts to a bucket with capacity 3 should succeed", + }, + { + name: "Full bucket must not add items if old items are not expired yet", + bucketCapacity: 2, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{true, true, false}, // First two succeed, third fails + description: "Adding third alert to full bucket with non-expired items should fail", + }, + { + name: "Full bucket must add items if old items are expired", + bucketCapacity: 2, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(-1 * time.Hour)}, // Expired 1 hour ago + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(-30 * time.Minute)}, // Expired 30 minutes ago + {Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, // Will expire in 1 hour + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{true, true, true}, // All should succeed because older items get evicted + description: "Adding new alerts when bucket is full but oldest items are expired should succeed", + }, + { + name: "Update existing alert in bucket should not increase size", + bucketCapacity: 2, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Same fingerprint, different EndsAt + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{true, true, true}, // All should succeed - second is an update, not a new entry + description: "Updating existing alert should not consume additional bucket space", + }, + { + name: "Mixed scenario with expiration and updates", + bucketCapacity: 2, + alerts: []model.Alert{ + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(-1 * time.Hour)}, // Expired + {Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, // Active + {Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Update of first alert + {Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, // New alert, bucket full but Alert2 not expired + }, + alertTimings: []time.Time{time.Now(), time.Now(), time.Now(), time.Now()}, + expectedResult: []bool{true, true, true, false}, // Last one should fail because bucket is full with non-expired items + description: "Complex scenario with expiration, updates, and eviction should work correctly", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](tc.bucketCapacity) + + for i, alert := range tc.alerts { + result := bucket.Upsert(alert.Fingerprint(), alert.EndsAt) + require.Equal(t, tc.expectedResult[i], result, + "Alert %d: expected %v, got %v. %s", i+1, tc.expectedResult[i], result, tc.description) + } + }) + } +} + +func TestBucketAddConcurrency(t *testing.T) { + bucket := NewBucket[model.Fingerprint](2) + + // Test that concurrent access to bucket is safe + alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)} + alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)} + + done := make(chan bool, 2) + + // Add alerts concurrently + go func() { + bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt) + done <- true + }() + + go func() { + bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt) + done <- true + }() + + // Wait for both goroutines to complete + <-done + <-done + + // Verify that both alerts were added (bucket should contain 2 items) + require.Len(t, bucket.index, 2, "Expected 2 alerts in bucket after concurrent adds") + require.Len(t, bucket.items, 2, "Expected 2 items in bucket map after concurrent adds") +} + +func TestBucketAddExpiredEviction(t *testing.T) { + bucket := NewBucket[model.Fingerprint](2) + + // Add two alerts that are already expired + expiredAlert1 := model.Alert{ + Labels: model.LabelSet{"alertname": "ExpiredAlert1", "instance": "server1"}, + EndsAt: time.Now().Add(-1 * time.Hour), + } + expiredFingerprint1 := expiredAlert1.Fingerprint() + expiredAlert2 := model.Alert{ + Labels: model.LabelSet{"alertname": "ExpiredAlert2", "instance": "server2"}, + EndsAt: time.Now().Add(-30 * time.Minute), + } + expiredFingerprint2 := expiredAlert2.Fingerprint() + + // Fill the bucket with expired alerts + result1 := bucket.Upsert(expiredFingerprint1, expiredAlert1.EndsAt) + require.True(t, result1, "First expired alert should be added successfully") + + result2 := bucket.Upsert(expiredFingerprint2, expiredAlert2.EndsAt) + require.True(t, result2, "Second expired alert should be added successfully") + + // Now add a fresh alert - it should evict the first expired alert + freshAlert := model.Alert{ + Labels: model.LabelSet{"alertname": "FreshAlert", "instance": "server3"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + freshFingerprint := freshAlert.Fingerprint() + + result3 := bucket.Upsert(freshFingerprint, freshAlert.EndsAt) + require.True(t, result3, "Fresh alert should be added successfully, evicting expired alert") + + // Verify the bucket state + require.Len(t, bucket.index, 2, "Bucket should still contain 2 items after eviction") + require.Len(t, bucket.items, 2, "Bucket map should still contain 2 items after eviction") + + // The fresh alert should be in the bucket + _, exists := bucket.index[freshFingerprint] + require.True(t, exists, "Fresh alert should exist in bucket after eviction") + + // The first expired alert should have been evicted + _, exists = bucket.index[expiredFingerprint1] + require.False(t, exists, "First expired alert should have been evicted from bucket, fingerprint: %d", expiredFingerprint1) +} + +func TestBucketAddEdgeCases(t *testing.T) { + t.Run("Single capacity bucket with replacement", func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](1) + + // Add expired alert + expiredAlert := model.Alert{Labels: model.LabelSet{"alertname": "Expired"}, EndsAt: time.Now().Add(-1 * time.Hour)} + result1 := bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt) + require.True(t, result1, "Adding expired alert to single-capacity bucket should succeed") + + // Add fresh alert (should replace expired one) + freshAlert := model.Alert{Labels: model.LabelSet{"alertname": "Fresh"}, EndsAt: time.Now().Add(1 * time.Hour)} + result2 := bucket.Upsert(freshAlert.Fingerprint(), freshAlert.EndsAt) + require.True(t, result2, "Adding fresh alert should succeed by replacing expired one") + + // Verify only the fresh alert remains + require.Len(t, bucket.index, 1, "Bucket should contain exactly 1 item") + freshFingerprint := freshAlert.Fingerprint() + _, exists := bucket.index[freshFingerprint] + require.True(t, exists, "Fresh alert should exist in bucket") + }) + + t.Run("Alert with same fingerprint but different EndsAt", func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](2) + + // Add initial alert + originalTime := time.Now().Add(1 * time.Hour) + alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Test"}, EndsAt: originalTime} + result1 := bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt) + require.True(t, result1, "Initial alert should be added successfully") + + // Add same alert with different EndsAt (should update, not add new) + updatedTime := time.Now().Add(2 * time.Hour) + alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Test"}, EndsAt: updatedTime} + result2 := bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt) + require.True(t, result2, "Updated alert should not fill bucket") + + // Verify bucket still has only one entry with updated time + require.Len(t, bucket.index, 1, "Bucket should contain exactly 1 item after update") + fingerprint := alert1.Fingerprint() + storedTime, exists := bucket.index[fingerprint] + require.True(t, exists, "Alert should exist in bucket") + require.Equal(t, updatedTime, storedTime.priority, "Alert should have updated EndsAt time") + }) +} + +// Benchmark tests for Bucket.Upsert() performance. +func BenchmarkBucketUpsert(b *testing.B) { + b.Run("EmptyBucket", func(b *testing.B) { + bucket := NewBucket[model.Fingerprint](1000) + alert := model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bucket.Upsert(alert.Fingerprint(), alert.EndsAt) + } + }) + + b.Run("AddToFullBucketWithExpiredItems", func(b *testing.B) { + bucketSize := 100 + bucket := NewBucket[model.Fingerprint](bucketSize) + + // Fill bucket with expired alerts + for i := range bucketSize { + expiredAlert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue("ExpiredAlert" + string(rune(i))), "instance": "server1"}, + EndsAt: time.Now().Add(-1 * time.Hour), // Expired 1 hour ago + } + bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt) + } + + newAlert := model.Alert{ + Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt) + } + }) + + b.Run("AddToFullBucketWithActiveItems", func(b *testing.B) { + bucketSize := 100 + bucket := NewBucket[model.Fingerprint](bucketSize) + + // Fill bucket with active alerts + for i := range bucketSize { + activeAlert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue("ActiveAlert" + string(rune(i))), "instance": "server1"}, + EndsAt: time.Now().Add(1 * time.Hour), // Active for 1 hour + } + bucket.Upsert(activeAlert.Fingerprint(), activeAlert.EndsAt) + } + + newAlert := model.Alert{ + Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt) + } + }) + + b.Run("UpdateExistingItem", func(b *testing.B) { + bucket := NewBucket[model.Fingerprint](100) + + // Add initial alert + alert := model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + bucket.Upsert(alert.Fingerprint(), alert.EndsAt) + + // Create update with same fingerprint but different EndsAt + updatedAlert := model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"}, + EndsAt: time.Now().Add(2 * time.Hour), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bucket.Upsert(updatedAlert.Fingerprint(), updatedAlert.EndsAt) + } + }) + + b.Run("MixedWorkload", func(b *testing.B) { + bucketSize := 50 + bucket := NewBucket[model.Fingerprint](bucketSize) + + // Pre-populate with mix of expired and active alerts + for i := 0; i < bucketSize/2; i++ { + expiredAlert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue("ExpiredAlert" + string(rune(i))), "instance": "server1"}, + EndsAt: time.Now().Add(-1 * time.Hour), + } + bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt) + } + for i := 0; i < bucketSize/2; i++ { + activeAlert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue("ActiveAlert" + string(rune(i))), "instance": "server1"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + bucket.Upsert(activeAlert.Fingerprint(), activeAlert.EndsAt) + } + + // Create different types of alerts for the benchmark + alerts := []*model.Alert{ + {Labels: model.LabelSet{"alertname": "NewAlert1", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, + {Labels: model.LabelSet{"alertname": "ExpiredAlert0", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Update existing + {Labels: model.LabelSet{"alertname": "NewAlert2", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + alertIndex := i % len(alerts) + bucket.Upsert(alerts[alertIndex].Fingerprint(), alerts[alertIndex].EndsAt) + } + }) +} + +// Benchmark different bucket sizes to understand scaling behavior. +func BenchmarkBucketUpsertScaling(b *testing.B) { + sizes := []int{10, 50, 100, 500, 1000} + + for _, size := range sizes { + b.Run(fmt.Sprintf("BucketSize_%d", size), func(b *testing.B) { + bucket := NewBucket[model.Fingerprint](size) + + // Fill bucket to capacity with expired items + for i := range size { + alert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert%d", i)), "instance": "server1"}, + EndsAt: time.Now().Add(-1 * time.Hour), + } + bucket.Upsert(alert.Fingerprint(), alert.EndsAt) + } + + newAlert := model.Alert{ + Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + + b.ResetTimer() + for range b.N { + bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt) + } + }) + } +} + +func TestBucketIsStale(t *testing.T) { + t.Run("IsStale on empty bucket should return true", func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](5) + + // Should not panic when bucket is empty and return true + require.NotPanics(t, func() { + stale := bucket.IsStale() + require.True(t, stale, "IsStale on empty bucket should return true") + }, "IsStale on empty bucket should not panic") + }) + + t.Run("IsStale returns true when latest item is expired", func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](3) + + // Add three alerts that are all expired + expiredTime := time.Now().Add(-1 * time.Hour) + alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Alert1"}, EndsAt: expiredTime} + alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Alert2"}, EndsAt: expiredTime.Add(-10 * time.Minute)} + alert3 := model.Alert{Labels: model.LabelSet{"alertname": "Alert3"}, EndsAt: expiredTime.Add(-20 * time.Minute)} + + bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt) + bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt) + bucket.Upsert(alert3.Fingerprint(), alert3.EndsAt) + + require.Len(t, bucket.items, 3, "Bucket should have 3 items before IsStale check") + require.Len(t, bucket.index, 3, "Bucket index should have 3 items before IsStale check") + + // IsStale should return true when all items are expired + stale := bucket.IsStale() + + require.True(t, stale, "IsStale should return true when all items are expired") + // IsStale doesn't remove items, so bucket should still contain them + require.Len(t, bucket.items, 3, "Bucket should still have 3 items after IsStale check") + require.Len(t, bucket.index, 3, "Bucket index should still have 3 items after IsStale check") + }) + + t.Run("IsStale returns false when latest item is not expired", func(t *testing.T) { + bucket := NewBucket[model.Fingerprint](3) + + // Add mix of expired and non-expired alerts + expiredTime := time.Now().Add(-1 * time.Hour) + futureTime := time.Now().Add(1 * time.Hour) + + alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Expired1"}, EndsAt: expiredTime} + alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Expired2"}, EndsAt: expiredTime.Add(-10 * time.Minute)} + alert3 := model.Alert{Labels: model.LabelSet{"alertname": "Active"}, EndsAt: futureTime} + + bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt) + bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt) + bucket.Upsert(alert3.Fingerprint(), alert3.EndsAt) + + require.Len(t, bucket.items, 3, "Bucket should have 3 items before IsStale check") + + // IsStale should return false since the latest item (alert3) is not expired + stale := bucket.IsStale() + + require.False(t, stale, "IsStale should return false when latest item is not expired") + require.Len(t, bucket.items, 3, "Bucket should still have 3 items after IsStale check") + require.Len(t, bucket.index, 3, "Bucket index should still have 3 items after IsStale check") + }) +} + +// Benchmark concurrent access to Bucket.Upsert(). +func BenchmarkBucketUpsertConcurrent(b *testing.B) { + bucket := NewBucket[model.Fingerprint](100) + + b.RunParallel(func(pb *testing.PB) { + alertCounter := 0 + for pb.Next() { + alert := model.Alert{ + Labels: model.LabelSet{"alertname": model.LabelValue("Alert" + string(rune(alertCounter))), "instance": "server1"}, + EndsAt: time.Now().Add(1 * time.Hour), + } + bucket.Upsert(alert.Fingerprint(), alert.EndsAt) + alertCounter++ + } + }) +} diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 6e8087f33..fb7294c66 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -15,6 +15,7 @@ package mem import ( "context" + "errors" "log/slog" "sync" "time" @@ -27,6 +28,7 @@ import ( "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" @@ -52,7 +54,10 @@ type Alerts struct { logger *slog.Logger propagator propagation.TextMapPropagator + flagger featurecontrol.Flagger + alertsLimit prometheus.Gauge + alertsLimitedTotal *prometheus.CounterVec subscriberChannelWrites *prometheus.CounterVec } @@ -79,6 +84,23 @@ type listeningAlerts struct { func (a *Alerts) registerMetrics(r prometheus.Registerer) { r.MustRegister(&alertsCollector{alerts: a}) + a.alertsLimit = promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_alerts_per_alert_limit", + Help: "Current limit on number of alerts per alert name", + }) + + labels := []string{} + if a.flagger.EnableAlertNamesInMetrics() { + labels = append(labels, "alertname") + } + a.alertsLimitedTotal = promauto.With(r).NewCounterVec( + prometheus.CounterOpts{ + Name: "alertmanager_alerts_limited_total", + Help: "Total number of alerts that were dropped due to per alert name limit", + }, + labels, + ) + a.subscriberChannelWrites = promauto.With(r).NewCounterVec( prometheus.CounterOpts{ Name: "alertmanager_alerts_subscriber_channel_writes_total", @@ -89,25 +111,44 @@ func (a *Alerts) registerMetrics(r prometheus.Registerer) { } // NewAlerts returns a new alert provider. -func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duration, alertCallback AlertStoreCallback, l *slog.Logger, r prometheus.Registerer) (*Alerts, error) { +func NewAlerts( + ctx context.Context, + m types.AlertMarker, + intervalGC time.Duration, + perAlertNameLimit int, + alertCallback AlertStoreCallback, + l *slog.Logger, + r prometheus.Registerer, + flagger featurecontrol.Flagger, +) (*Alerts, error) { if alertCallback == nil { alertCallback = noopCallback{} } + if perAlertNameLimit > 0 { + l.Info("per alert name limit enabled", "limit", perAlertNameLimit) + } + + if flagger == nil { + flagger = featurecontrol.NoopFlags{} + } + ctx, cancel := context.WithCancel(ctx) a := &Alerts{ marker: m, - alerts: store.NewAlerts(), + alerts: store.NewAlerts().WithPerAlertLimit(perAlertNameLimit), cancel: cancel, listeners: map[int]listeningAlerts{}, next: 0, logger: l.With("component", "provider"), propagator: otel.GetTextMapPropagator(), callback: alertCallback, + flagger: flagger, } if r != nil { a.registerMetrics(r) + a.alertsLimit.Set(float64(perAlertNameLimit)) } go a.gcLoop(ctx, intervalGC) @@ -271,7 +312,14 @@ func (a *Alerts) Put(ctx context.Context, alerts ...*types.Alert) error { } if err := a.alerts.Set(alert); err != nil { - a.logger.Error("error on set alert", "err", err) + a.logger.Warn("error on set alert", "alertname", alert.Name(), "err", err) + if errors.Is(err, store.ErrLimited) { + labels := []string{} + if a.flagger.EnableAlertNamesInMetrics() { + labels = append(labels, alert.Name()) + } + a.alertsLimitedTotal.WithLabelValues(labels...).Inc() + } continue } diff --git a/provider/mem/mem_test.go b/provider/mem/mem_test.go index 3d27f67a0..7ea72584f 100644 --- a/provider/mem/mem_test.go +++ b/provider/mem/mem_test.go @@ -82,7 +82,7 @@ var ( // a listener can not unsubscribe as the lock is hold by `alerts.Lock`. func TestAlertsSubscribePutStarvation(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry()) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil) if err != nil { t.Fatal(err) } @@ -137,7 +137,7 @@ func TestDeadLock(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) // Run gc every 5 milliseconds to increase the possibility of a deadlock with Subscribe() - alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry()) + alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil) if err != nil { t.Fatal(err) } @@ -190,7 +190,7 @@ func TestDeadLock(t *testing.T) { func TestAlertsPut(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry()) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestAlertsSubscribe(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) ctx := t.Context() - alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry()) + alerts, err := NewAlerts(ctx, marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil) if err != nil { t.Fatal(err) } @@ -291,7 +291,7 @@ func TestAlertsSubscribe(t *testing.T) { func TestAlertsGetPending(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), nil, nil) if err != nil { t.Fatal(err) } @@ -329,7 +329,7 @@ func TestAlertsGetPending(t *testing.T) { func TestAlertsGC(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, noopCallback{}, promslog.NewNopLogger(), nil, nil) if err != nil { t.Fatal(err) } @@ -366,7 +366,7 @@ func TestAlertsStoreCallback(t *testing.T) { cb := &limitCountCallback{limit: 3} marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, promslog.NewNopLogger(), nil) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, cb, promslog.NewNopLogger(), nil, nil) if err != nil { t.Fatal(err) } @@ -427,7 +427,7 @@ func TestAlertsStoreCallback(t *testing.T) { func TestAlerts_CountByState(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) - alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, nil, promslog.NewNopLogger(), nil) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, nil, promslog.NewNopLogger(), nil, nil) require.NoError(t, err) countTotal := func() int { @@ -546,7 +546,7 @@ func (l *limitCountCallback) PostDelete(_ *types.Alert) { func TestAlertsConcurrently(t *testing.T) { callback := &limitCountCallback{limit: 100} - a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, promslog.NewNopLogger(), nil) + a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, 0, callback, promslog.NewNopLogger(), nil, nil) require.NoError(t, err) stopc := make(chan struct{}) @@ -607,7 +607,7 @@ func TestAlertsConcurrently(t *testing.T) { func TestSubscriberChannelMetrics(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) reg := prometheus.NewRegistry() - alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), reg) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), reg, nil) require.NoError(t, err) subscriberName := "test_subscriber" diff --git a/store/store.go b/store/store.go index bcb69b0f3..daa7c1435 100644 --- a/store/store.go +++ b/store/store.go @@ -21,9 +21,13 @@ import ( "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/limit" "github.com/prometheus/alertmanager/types" ) +// ErrLimited is returned if a Store has reached the per-alert limit. +var ErrLimited = errors.New("alert limited") + // ErrNotFound is returned if a Store cannot find the Alert. var ErrNotFound = errors.New("alert not found") @@ -33,26 +37,40 @@ var ErrNotFound = errors.New("alert not found") // resolved alerts that have been removed. type Alerts struct { sync.Mutex - c map[model.Fingerprint]*types.Alert - cb func([]types.Alert) + alerts map[model.Fingerprint]*types.Alert + gcCallback func([]types.Alert) + limits map[string]*limit.Bucket[model.Fingerprint] + perAlertLimit int } // NewAlerts returns a new Alerts struct. func NewAlerts() *Alerts { a := &Alerts{ - c: make(map[model.Fingerprint]*types.Alert), - cb: func(_ []types.Alert) {}, + alerts: make(map[model.Fingerprint]*types.Alert), + gcCallback: func(_ []types.Alert) {}, + perAlertLimit: 0, } return a } +// WithPerAlertLimit sets the per-alert limit for the Alerts struct. +func (a *Alerts) WithPerAlertLimit(lim int) *Alerts { + a.Lock() + defer a.Unlock() + + a.limits = make(map[string]*limit.Bucket[model.Fingerprint]) + a.perAlertLimit = lim + + return a +} + // SetGCCallback sets a GC callback to be executed after each GC. func (a *Alerts) SetGCCallback(cb func([]types.Alert)) { a.Lock() defer a.Unlock() - a.cb = cb + a.gcCallback = cb } // Run starts the GC loop. The interval must be greater than zero; if not, the function will panic. @@ -73,9 +91,9 @@ func (a *Alerts) Run(ctx context.Context, interval time.Duration) { func (a *Alerts) GC() []types.Alert { a.Lock() var resolved []types.Alert - for fp, alert := range a.c { + for fp, alert := range a.alerts { if alert.Resolved() { - delete(a.c, fp) + delete(a.alerts, fp) resolved = append(resolved, types.Alert{ Alert: model.Alert{ Labels: alert.Labels.Clone(), @@ -89,8 +107,16 @@ func (a *Alerts) GC() []types.Alert { }) } } + + // Remove stale alert limit buckets + for alertName, bucket := range a.limits { + if bucket.IsStale() { + delete(a.limits, alertName) + } + } + a.Unlock() - a.cb(resolved) + a.gcCallback(resolved) return resolved } @@ -100,7 +126,7 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { a.Lock() defer a.Unlock() - alert, prs := a.c[fp] + alert, prs := a.alerts[fp] if !prs { return nil, ErrNotFound } @@ -112,7 +138,22 @@ func (a *Alerts) Set(alert *types.Alert) error { a.Lock() defer a.Unlock() - a.c[alert.Fingerprint()] = alert + fp := alert.Fingerprint() + name := alert.Name() + + // Apply per alert limits if necessary + if a.perAlertLimit > 0 { + bucket, ok := a.limits[name] + if !ok { + bucket = limit.NewBucket[model.Fingerprint](a.perAlertLimit) + a.limits[name] = bucket + } + if !bucket.Upsert(fp, alert.EndsAt) { + return ErrLimited + } + } + + a.alerts[fp] = alert return nil } @@ -123,8 +164,8 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error { defer a.Unlock() for _, alert := range alerts { fp := alert.Fingerprint() - if other, ok := a.c[fp]; ok && alert.UpdatedAt.Equal(other.UpdatedAt) { - delete(a.c, fp) + if other, ok := a.alerts[fp]; ok && alert.UpdatedAt.Equal(other.UpdatedAt) { + delete(a.alerts, fp) } } return nil @@ -135,8 +176,8 @@ func (a *Alerts) List() []*types.Alert { a.Lock() defer a.Unlock() - alerts := make([]*types.Alert, 0, len(a.c)) - for _, alert := range a.c { + alerts := make([]*types.Alert, 0, len(a.alerts)) + for _, alert := range a.alerts { alerts = append(alerts, alert) } @@ -148,7 +189,7 @@ func (a *Alerts) Empty() bool { a.Lock() defer a.Unlock() - return len(a.c) == 0 + return len(a.alerts) == 0 } // Len returns the number of alerts in the store. @@ -156,5 +197,5 @@ func (a *Alerts) Len() int { a.Lock() defer a.Unlock() - return len(a.c) + return len(a.alerts) }