mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 06:45:45 +01:00
feat(provider): implement per-alert limits (#4819)
* feat(limit): add new limit package with bucket Add a new limit package with generic bucket implementation. This can be used for example to limit the number of alerts in memory. Benchmarks: ```go goos: darwin goarch: arm64 pkg: github.com/prometheus/alertmanager/limit cpu: Apple M3 Pro BenchmarkBucketUpsert/EmptyBucket-12 8816954 122.4 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsert/AddToFullBucketWithExpiredItems-12 9861010 123.0 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsert/AddToFullBucketWithActiveItems-12 8343778 143.6 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsert/UpdateExistingAlert-12 10107787 118.9 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsert/MixedWorkload-12 9436174 126.0 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertScaling/BucketSize_10-12 10255278 115.4 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertScaling/BucketSize_50-12 10166518 117.1 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertScaling/BucketSize_100-12 10457394 115.0 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertScaling/BucketSize_500-12 9644079 115.2 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertScaling/BucketSize_1000-12 10426184 116.6 ns/op 56 B/op 2 allocs/op BenchmarkBucketUpsertConcurrent-12 5796210 216.3 ns/op 406 B/op 5 allocs/op PASS ok github.com/prometheus/alertmanager/limit 15.497s ``` Signed-off-by: Siavash Safi <siavash@cloudflare.com> * feat(provider): implement per-alert limits Use the new limit module to add optional per alert-name limits. The metrics for limited alerts can be enabled using `alerts-limited-metric` feature flag. Signed-off-by: Siavash Safi <siavash@cloudflare.com> --------- Signed-off-by: Siavash Safi <siavash@cloudflare.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2015 Prometheus Team
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -143,6 +143,7 @@ func run() int {
|
||||
maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int()
|
||||
maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int()
|
||||
alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()
|
||||
perAlertNameLimit = kingpin.Flag("alerts.per-alertname-limit", "Maximum number of alerts per alertname. If negative or zero, no limit is set.").Default("0").Int()
|
||||
dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration()
|
||||
DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration()
|
||||
|
||||
@@ -345,7 +346,16 @@ func run() int {
|
||||
go peer.Settle(ctx, *gossipInterval*10)
|
||||
}
|
||||
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer)
|
||||
alerts, err := mem.NewAlerts(
|
||||
context.Background(),
|
||||
marker,
|
||||
*alertGCInterval,
|
||||
*perAlertNameLimit,
|
||||
nil,
|
||||
logger,
|
||||
prometheus.DefaultRegisterer,
|
||||
ff,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error("error creating memory provider", "err", err)
|
||||
return 1
|
||||
|
||||
@@ -387,7 +387,7 @@ route:
|
||||
route := NewRoute(conf.Route, nil)
|
||||
reg := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -538,7 +538,7 @@ route:
|
||||
route := NewRoute(conf.Route, nil)
|
||||
reg := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -660,7 +660,7 @@ func TestDispatcherRace(t *testing.T) {
|
||||
logger := promslog.NewNopLogger()
|
||||
reg := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -678,7 +678,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
|
||||
logger := promslog.NewNopLogger()
|
||||
reg := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -732,7 +732,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
|
||||
r := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(r)
|
||||
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), r)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, 0, nil, promslog.NewNopLogger(), r, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -971,7 +971,7 @@ func TestDispatchOnStartup(t *testing.T) {
|
||||
logger := promslog.NewNopLogger()
|
||||
reg := prometheus.NewRegistry()
|
||||
marker := types.NewMarker(reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, 0, nil, logger, reg, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -72,3 +72,17 @@ Alertmanager supports configuration to create a cluster for high availability.
|
||||
This can be configured using the [--cluster-*](https://github.com/prometheus/alertmanager#high-availability) flags.
|
||||
|
||||
It's important not to load balance traffic between Prometheus and its Alertmanagers, but instead, point Prometheus to a list of all Alertmanagers.
|
||||
|
||||
## Alert limits (optional)
|
||||
|
||||
Alertmanager supports configuration to limit the number of active alerts per alertname.
|
||||
This can be configured using the [--alerts.per-alertname-limit] flag.
|
||||
|
||||
When the limit is reached any new alerts are dropped, heartbeats from already know alerts are processed.
|
||||
The known alert (fingerprint) automatically expire to make room for new alerts.
|
||||
|
||||
This feature is useful when an unexpected high number of instances of the same alert are sent to Alertmanager.
|
||||
Limiting the number of alerts per alertname can prevent reliability issues and avoid alert receivers from being flooded.
|
||||
|
||||
The `alertmanager_alerts_limited_total` metric shows the total number of alerts that were dropped due to per alert name limit.
|
||||
Enabling the `alert-names-in-metrics` feature flag will add the `alertname` label to the metric.
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
FeatureAlertNamesInMetrics = "alert-names-in-metrics"
|
||||
FeatureReceiverNameInMetrics = "receiver-name-in-metrics"
|
||||
FeatureClassicMode = "classic-mode"
|
||||
FeatureUTF8StrictMode = "utf8-strict-mode"
|
||||
@@ -29,6 +30,7 @@ const (
|
||||
)
|
||||
|
||||
var AllowedFlags = []string{
|
||||
FeatureAlertNamesInMetrics,
|
||||
FeatureReceiverNameInMetrics,
|
||||
FeatureClassicMode,
|
||||
FeatureUTF8StrictMode,
|
||||
@@ -37,6 +39,7 @@ var AllowedFlags = []string{
|
||||
}
|
||||
|
||||
type Flagger interface {
|
||||
EnableAlertNamesInMetrics() bool
|
||||
EnableReceiverNamesInMetrics() bool
|
||||
ClassicMode() bool
|
||||
UTF8StrictMode() bool
|
||||
@@ -46,6 +49,7 @@ type Flagger interface {
|
||||
|
||||
type Flags struct {
|
||||
logger *slog.Logger
|
||||
enableAlertNamesInMetrics bool
|
||||
enableReceiverNamesInMetrics bool
|
||||
classicMode bool
|
||||
utf8StrictMode bool
|
||||
@@ -53,6 +57,10 @@ type Flags struct {
|
||||
enableAutoGOMAXPROCS bool
|
||||
}
|
||||
|
||||
func (f *Flags) EnableAlertNamesInMetrics() bool {
|
||||
return f.enableAlertNamesInMetrics
|
||||
}
|
||||
|
||||
func (f *Flags) EnableReceiverNamesInMetrics() bool {
|
||||
return f.enableReceiverNamesInMetrics
|
||||
}
|
||||
@@ -105,6 +113,12 @@ func enableAutoGOMAXPROCS() flagOption {
|
||||
}
|
||||
}
|
||||
|
||||
func enableAlertNamesInMetrics() flagOption {
|
||||
return func(configs *Flags) {
|
||||
configs.enableAlertNamesInMetrics = true
|
||||
}
|
||||
}
|
||||
|
||||
func NewFlags(logger *slog.Logger, features string) (Flagger, error) {
|
||||
fc := &Flags{logger: logger}
|
||||
opts := []flagOption{}
|
||||
@@ -115,6 +129,9 @@ func NewFlags(logger *slog.Logger, features string) (Flagger, error) {
|
||||
|
||||
for feature := range strings.SplitSeq(features, ",") {
|
||||
switch feature {
|
||||
case FeatureAlertNamesInMetrics:
|
||||
opts = append(opts, enableAlertNamesInMetrics())
|
||||
logger.Warn("Alert names in metrics enabled")
|
||||
case FeatureReceiverNameInMetrics:
|
||||
opts = append(opts, enableReceiverNameInMetrics())
|
||||
logger.Warn("Experimental receiver name in metrics enabled")
|
||||
@@ -148,6 +165,8 @@ func NewFlags(logger *slog.Logger, features string) (Flagger, error) {
|
||||
|
||||
type NoopFlags struct{}
|
||||
|
||||
func (n NoopFlags) EnableAlertNamesInMetrics() bool { return false }
|
||||
|
||||
func (n NoopFlags) EnableReceiverNamesInMetrics() bool { return false }
|
||||
|
||||
func (n NoopFlags) ClassicMode() bool { return false }
|
||||
|
||||
@@ -184,7 +184,7 @@ func lastRuleMatchesBenchmark(b *testing.B, n int) benchmarkOptions {
|
||||
func benchmarkMutes(b *testing.B, opts benchmarkOptions) {
|
||||
r := prometheus.NewRegistry()
|
||||
m := types.NewMarker(r)
|
||||
s, err := mem.NewAlerts(context.TODO(), m, time.Minute, nil, promslog.NewNopLogger(), r)
|
||||
s, err := mem.NewAlerts(context.TODO(), m, time.Minute, 0, nil, promslog.NewNopLogger(), r, nil)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
160
limit/bucket.go
Normal file
160
limit/bucket.go
Normal file
@@ -0,0 +1,160 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package limit
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// item represents a value and its priority based on time.
|
||||
type item[V any] struct {
|
||||
value V
|
||||
priority time.Time
|
||||
index int
|
||||
}
|
||||
|
||||
// expired returns true if the item is expired (priority is before the given time).
|
||||
func (i *item[V]) expired(at time.Time) bool {
|
||||
return i.priority.Before(at)
|
||||
}
|
||||
|
||||
// sortedItems is a heap of items.
|
||||
type sortedItems[V any] []*item[V]
|
||||
|
||||
// Len returns the number of items in the heap.
|
||||
func (s sortedItems[V]) Len() int { return len(s) }
|
||||
|
||||
// Less reports whether the element with index i should sort before the element with index j.
|
||||
func (s sortedItems[V]) Less(i, j int) bool { return s[i].priority.Before(s[j].priority) }
|
||||
|
||||
// Swap swaps the elements with indexes i and j.
|
||||
func (s sortedItems[V]) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
s[i].index = i
|
||||
s[j].index = j
|
||||
}
|
||||
|
||||
// Push adds an item to the heap.
|
||||
func (s *sortedItems[V]) Push(x any) {
|
||||
n := len(*s)
|
||||
item := x.(*item[V])
|
||||
item.index = n
|
||||
*s = append(*s, item)
|
||||
}
|
||||
|
||||
// Pop removes and returns the minimum element (according to Less).
|
||||
func (s *sortedItems[V]) Pop() any {
|
||||
old := *s
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil // don't stop the GC from reclaiming the item eventually
|
||||
item.index = -1 // for safety
|
||||
*s = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// update modifies the priority and value of an item in the heap.
|
||||
func (s *sortedItems[V]) update(item *item[V], priority time.Time) {
|
||||
item.priority = priority
|
||||
heap.Fix(s, item.index)
|
||||
}
|
||||
|
||||
// Bucket is a simple cache for values with priority(expiry).
|
||||
// It has:
|
||||
// - configurable capacity.
|
||||
// - a mutex for thread safety.
|
||||
// - a sorted heap of items for priority/expiry based eviction.
|
||||
// - an index of items for fast updates.
|
||||
type Bucket[V comparable] struct {
|
||||
mtx sync.Mutex
|
||||
index map[V]*item[V]
|
||||
items sortedItems[V]
|
||||
capacity int
|
||||
}
|
||||
|
||||
// NewBucket creates a new bucket with the given capacity.
|
||||
// All internal data structures are initialized to the given capacity to avoid allocations during runtime.
|
||||
func NewBucket[V comparable](capacity int) *Bucket[V] {
|
||||
items := make(sortedItems[V], 0, capacity)
|
||||
heap.Init(&items)
|
||||
return &Bucket[V]{
|
||||
index: make(map[V]*item[V], capacity),
|
||||
items: items,
|
||||
capacity: capacity,
|
||||
}
|
||||
}
|
||||
|
||||
// IsStale returns true if the latest item in the bucket is expired.
|
||||
func (b *Bucket[V]) IsStale() (stale bool) {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
if b.items.Len() == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
latest := b.items[b.items.Len()-1]
|
||||
return latest.expired(time.Now())
|
||||
}
|
||||
|
||||
// Upsert tries to add a new value and its priority to the bucket.
|
||||
// If the value is already in the bucket, its priority is updated.
|
||||
// If the bucket is not full, the new value is added.
|
||||
// If the bucket is full, oldest expired item is evicted based on priority and the new value is added.
|
||||
// Otherwise the new value is ignored and the method returns false.
|
||||
func (b *Bucket[V]) Upsert(value V, priority time.Time) (ok bool) {
|
||||
if b.capacity < 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
|
||||
// If the value is already in the index, update it.
|
||||
if item, exists := b.index[value]; exists {
|
||||
b.items.update(item, priority)
|
||||
return true
|
||||
}
|
||||
|
||||
// If the bucket is not full, add the new value to the heap and index.
|
||||
if b.items.Len() < b.capacity {
|
||||
item := &item[V]{
|
||||
value: value,
|
||||
priority: priority,
|
||||
}
|
||||
b.index[value] = item
|
||||
heap.Push(&b.items, item)
|
||||
return true
|
||||
}
|
||||
|
||||
// If the bucket is full, check the oldest item (at heap root) and evict it if expired
|
||||
oldest := b.items[0]
|
||||
if oldest.expired(time.Now()) {
|
||||
// Remove the expired item from both the heap and the index
|
||||
heap.Pop(&b.items)
|
||||
delete(b.index, oldest.value)
|
||||
|
||||
// Add the new item
|
||||
item := &item[V]{
|
||||
value: value,
|
||||
priority: priority,
|
||||
}
|
||||
b.index[value] = item
|
||||
heap.Push(&b.items, item)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
468
limit/bucket_test.go
Normal file
468
limit/bucket_test.go
Normal file
@@ -0,0 +1,468 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package limit
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBucketUpsert(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
bucketCapacity int
|
||||
alerts []model.Alert
|
||||
alertTimings []time.Time // When each alert is added relative to now
|
||||
expectedResult []bool // Expected return value for each Add() call
|
||||
description string
|
||||
}{
|
||||
{
|
||||
name: "Bucket with zero capacity should reject all alerts",
|
||||
bucketCapacity: 0,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{false, false, false}, // All should be rejected
|
||||
description: "Adding 3 alerts to a bucket with capacity 0 should fail",
|
||||
},
|
||||
{
|
||||
name: "Empty bucket should add items while not full",
|
||||
bucketCapacity: 3,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{true, true, true}, // All should be added successfully
|
||||
description: "Adding 3 alerts to a bucket with capacity 3 should succeed",
|
||||
},
|
||||
{
|
||||
name: "Full bucket must not add items if old items are not expired yet",
|
||||
bucketCapacity: 2,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{true, true, false}, // First two succeed, third fails
|
||||
description: "Adding third alert to full bucket with non-expired items should fail",
|
||||
},
|
||||
{
|
||||
name: "Full bucket must add items if old items are expired",
|
||||
bucketCapacity: 2,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(-1 * time.Hour)}, // Expired 1 hour ago
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(-30 * time.Minute)}, // Expired 30 minutes ago
|
||||
{Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, // Will expire in 1 hour
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{true, true, true}, // All should succeed because older items get evicted
|
||||
description: "Adding new alerts when bucket is full but oldest items are expired should succeed",
|
||||
},
|
||||
{
|
||||
name: "Update existing alert in bucket should not increase size",
|
||||
bucketCapacity: 2,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Same fingerprint, different EndsAt
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{true, true, true}, // All should succeed - second is an update, not a new entry
|
||||
description: "Updating existing alert should not consume additional bucket space",
|
||||
},
|
||||
{
|
||||
name: "Mixed scenario with expiration and updates",
|
||||
bucketCapacity: 2,
|
||||
alerts: []model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(-1 * time.Hour)}, // Expired
|
||||
{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}, // Active
|
||||
{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Update of first alert
|
||||
{Labels: model.LabelSet{"alertname": "Alert3", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)}, // New alert, bucket full but Alert2 not expired
|
||||
},
|
||||
alertTimings: []time.Time{time.Now(), time.Now(), time.Now(), time.Now()},
|
||||
expectedResult: []bool{true, true, true, false}, // Last one should fail because bucket is full with non-expired items
|
||||
description: "Complex scenario with expiration, updates, and eviction should work correctly",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](tc.bucketCapacity)
|
||||
|
||||
for i, alert := range tc.alerts {
|
||||
result := bucket.Upsert(alert.Fingerprint(), alert.EndsAt)
|
||||
require.Equal(t, tc.expectedResult[i], result,
|
||||
"Alert %d: expected %v, got %v. %s", i+1, tc.expectedResult[i], result, tc.description)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketAddConcurrency(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](2)
|
||||
|
||||
// Test that concurrent access to bucket is safe
|
||||
alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Alert1", "instance": "server1"}, EndsAt: time.Now().Add(1 * time.Hour)}
|
||||
alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Alert2", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)}
|
||||
|
||||
done := make(chan bool, 2)
|
||||
|
||||
// Add alerts concurrently
|
||||
go func() {
|
||||
bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt)
|
||||
done <- true
|
||||
}()
|
||||
|
||||
go func() {
|
||||
bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt)
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// Wait for both goroutines to complete
|
||||
<-done
|
||||
<-done
|
||||
|
||||
// Verify that both alerts were added (bucket should contain 2 items)
|
||||
require.Len(t, bucket.index, 2, "Expected 2 alerts in bucket after concurrent adds")
|
||||
require.Len(t, bucket.items, 2, "Expected 2 items in bucket map after concurrent adds")
|
||||
}
|
||||
|
||||
func TestBucketAddExpiredEviction(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](2)
|
||||
|
||||
// Add two alerts that are already expired
|
||||
expiredAlert1 := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "ExpiredAlert1", "instance": "server1"},
|
||||
EndsAt: time.Now().Add(-1 * time.Hour),
|
||||
}
|
||||
expiredFingerprint1 := expiredAlert1.Fingerprint()
|
||||
expiredAlert2 := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "ExpiredAlert2", "instance": "server2"},
|
||||
EndsAt: time.Now().Add(-30 * time.Minute),
|
||||
}
|
||||
expiredFingerprint2 := expiredAlert2.Fingerprint()
|
||||
|
||||
// Fill the bucket with expired alerts
|
||||
result1 := bucket.Upsert(expiredFingerprint1, expiredAlert1.EndsAt)
|
||||
require.True(t, result1, "First expired alert should be added successfully")
|
||||
|
||||
result2 := bucket.Upsert(expiredFingerprint2, expiredAlert2.EndsAt)
|
||||
require.True(t, result2, "Second expired alert should be added successfully")
|
||||
|
||||
// Now add a fresh alert - it should evict the first expired alert
|
||||
freshAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "FreshAlert", "instance": "server3"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
freshFingerprint := freshAlert.Fingerprint()
|
||||
|
||||
result3 := bucket.Upsert(freshFingerprint, freshAlert.EndsAt)
|
||||
require.True(t, result3, "Fresh alert should be added successfully, evicting expired alert")
|
||||
|
||||
// Verify the bucket state
|
||||
require.Len(t, bucket.index, 2, "Bucket should still contain 2 items after eviction")
|
||||
require.Len(t, bucket.items, 2, "Bucket map should still contain 2 items after eviction")
|
||||
|
||||
// The fresh alert should be in the bucket
|
||||
_, exists := bucket.index[freshFingerprint]
|
||||
require.True(t, exists, "Fresh alert should exist in bucket after eviction")
|
||||
|
||||
// The first expired alert should have been evicted
|
||||
_, exists = bucket.index[expiredFingerprint1]
|
||||
require.False(t, exists, "First expired alert should have been evicted from bucket, fingerprint: %d", expiredFingerprint1)
|
||||
}
|
||||
|
||||
func TestBucketAddEdgeCases(t *testing.T) {
|
||||
t.Run("Single capacity bucket with replacement", func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](1)
|
||||
|
||||
// Add expired alert
|
||||
expiredAlert := model.Alert{Labels: model.LabelSet{"alertname": "Expired"}, EndsAt: time.Now().Add(-1 * time.Hour)}
|
||||
result1 := bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt)
|
||||
require.True(t, result1, "Adding expired alert to single-capacity bucket should succeed")
|
||||
|
||||
// Add fresh alert (should replace expired one)
|
||||
freshAlert := model.Alert{Labels: model.LabelSet{"alertname": "Fresh"}, EndsAt: time.Now().Add(1 * time.Hour)}
|
||||
result2 := bucket.Upsert(freshAlert.Fingerprint(), freshAlert.EndsAt)
|
||||
require.True(t, result2, "Adding fresh alert should succeed by replacing expired one")
|
||||
|
||||
// Verify only the fresh alert remains
|
||||
require.Len(t, bucket.index, 1, "Bucket should contain exactly 1 item")
|
||||
freshFingerprint := freshAlert.Fingerprint()
|
||||
_, exists := bucket.index[freshFingerprint]
|
||||
require.True(t, exists, "Fresh alert should exist in bucket")
|
||||
})
|
||||
|
||||
t.Run("Alert with same fingerprint but different EndsAt", func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](2)
|
||||
|
||||
// Add initial alert
|
||||
originalTime := time.Now().Add(1 * time.Hour)
|
||||
alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Test"}, EndsAt: originalTime}
|
||||
result1 := bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt)
|
||||
require.True(t, result1, "Initial alert should be added successfully")
|
||||
|
||||
// Add same alert with different EndsAt (should update, not add new)
|
||||
updatedTime := time.Now().Add(2 * time.Hour)
|
||||
alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Test"}, EndsAt: updatedTime}
|
||||
result2 := bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt)
|
||||
require.True(t, result2, "Updated alert should not fill bucket")
|
||||
|
||||
// Verify bucket still has only one entry with updated time
|
||||
require.Len(t, bucket.index, 1, "Bucket should contain exactly 1 item after update")
|
||||
fingerprint := alert1.Fingerprint()
|
||||
storedTime, exists := bucket.index[fingerprint]
|
||||
require.True(t, exists, "Alert should exist in bucket")
|
||||
require.Equal(t, updatedTime, storedTime.priority, "Alert should have updated EndsAt time")
|
||||
})
|
||||
}
|
||||
|
||||
// Benchmark tests for Bucket.Upsert() performance.
|
||||
func BenchmarkBucketUpsert(b *testing.B) {
|
||||
b.Run("EmptyBucket", func(b *testing.B) {
|
||||
bucket := NewBucket[model.Fingerprint](1000)
|
||||
alert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bucket.Upsert(alert.Fingerprint(), alert.EndsAt)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("AddToFullBucketWithExpiredItems", func(b *testing.B) {
|
||||
bucketSize := 100
|
||||
bucket := NewBucket[model.Fingerprint](bucketSize)
|
||||
|
||||
// Fill bucket with expired alerts
|
||||
for i := range bucketSize {
|
||||
expiredAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue("ExpiredAlert" + string(rune(i))), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(-1 * time.Hour), // Expired 1 hour ago
|
||||
}
|
||||
bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt)
|
||||
}
|
||||
|
||||
newAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("AddToFullBucketWithActiveItems", func(b *testing.B) {
|
||||
bucketSize := 100
|
||||
bucket := NewBucket[model.Fingerprint](bucketSize)
|
||||
|
||||
// Fill bucket with active alerts
|
||||
for i := range bucketSize {
|
||||
activeAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue("ActiveAlert" + string(rune(i))), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour), // Active for 1 hour
|
||||
}
|
||||
bucket.Upsert(activeAlert.Fingerprint(), activeAlert.EndsAt)
|
||||
}
|
||||
|
||||
newAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("UpdateExistingItem", func(b *testing.B) {
|
||||
bucket := NewBucket[model.Fingerprint](100)
|
||||
|
||||
// Add initial alert
|
||||
alert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
bucket.Upsert(alert.Fingerprint(), alert.EndsAt)
|
||||
|
||||
// Create update with same fingerprint but different EndsAt
|
||||
updatedAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "TestAlert", "instance": "server1"},
|
||||
EndsAt: time.Now().Add(2 * time.Hour),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bucket.Upsert(updatedAlert.Fingerprint(), updatedAlert.EndsAt)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("MixedWorkload", func(b *testing.B) {
|
||||
bucketSize := 50
|
||||
bucket := NewBucket[model.Fingerprint](bucketSize)
|
||||
|
||||
// Pre-populate with mix of expired and active alerts
|
||||
for i := 0; i < bucketSize/2; i++ {
|
||||
expiredAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue("ExpiredAlert" + string(rune(i))), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(-1 * time.Hour),
|
||||
}
|
||||
bucket.Upsert(expiredAlert.Fingerprint(), expiredAlert.EndsAt)
|
||||
}
|
||||
for i := 0; i < bucketSize/2; i++ {
|
||||
activeAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue("ActiveAlert" + string(rune(i))), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
bucket.Upsert(activeAlert.Fingerprint(), activeAlert.EndsAt)
|
||||
}
|
||||
|
||||
// Create different types of alerts for the benchmark
|
||||
alerts := []*model.Alert{
|
||||
{Labels: model.LabelSet{"alertname": "NewAlert1", "instance": "server2"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
{Labels: model.LabelSet{"alertname": "ExpiredAlert0", "instance": "server1"}, EndsAt: time.Now().Add(2 * time.Hour)}, // Update existing
|
||||
{Labels: model.LabelSet{"alertname": "NewAlert2", "instance": "server3"}, EndsAt: time.Now().Add(1 * time.Hour)},
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
alertIndex := i % len(alerts)
|
||||
bucket.Upsert(alerts[alertIndex].Fingerprint(), alerts[alertIndex].EndsAt)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Benchmark different bucket sizes to understand scaling behavior.
|
||||
func BenchmarkBucketUpsertScaling(b *testing.B) {
|
||||
sizes := []int{10, 50, 100, 500, 1000}
|
||||
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("BucketSize_%d", size), func(b *testing.B) {
|
||||
bucket := NewBucket[model.Fingerprint](size)
|
||||
|
||||
// Fill bucket to capacity with expired items
|
||||
for i := range size {
|
||||
alert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert%d", i)), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(-1 * time.Hour),
|
||||
}
|
||||
bucket.Upsert(alert.Fingerprint(), alert.EndsAt)
|
||||
}
|
||||
|
||||
newAlert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": "NewAlert", "instance": "server2"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for range b.N {
|
||||
bucket.Upsert(newAlert.Fingerprint(), newAlert.EndsAt)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBucketIsStale(t *testing.T) {
|
||||
t.Run("IsStale on empty bucket should return true", func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](5)
|
||||
|
||||
// Should not panic when bucket is empty and return true
|
||||
require.NotPanics(t, func() {
|
||||
stale := bucket.IsStale()
|
||||
require.True(t, stale, "IsStale on empty bucket should return true")
|
||||
}, "IsStale on empty bucket should not panic")
|
||||
})
|
||||
|
||||
t.Run("IsStale returns true when latest item is expired", func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](3)
|
||||
|
||||
// Add three alerts that are all expired
|
||||
expiredTime := time.Now().Add(-1 * time.Hour)
|
||||
alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Alert1"}, EndsAt: expiredTime}
|
||||
alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Alert2"}, EndsAt: expiredTime.Add(-10 * time.Minute)}
|
||||
alert3 := model.Alert{Labels: model.LabelSet{"alertname": "Alert3"}, EndsAt: expiredTime.Add(-20 * time.Minute)}
|
||||
|
||||
bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt)
|
||||
bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt)
|
||||
bucket.Upsert(alert3.Fingerprint(), alert3.EndsAt)
|
||||
|
||||
require.Len(t, bucket.items, 3, "Bucket should have 3 items before IsStale check")
|
||||
require.Len(t, bucket.index, 3, "Bucket index should have 3 items before IsStale check")
|
||||
|
||||
// IsStale should return true when all items are expired
|
||||
stale := bucket.IsStale()
|
||||
|
||||
require.True(t, stale, "IsStale should return true when all items are expired")
|
||||
// IsStale doesn't remove items, so bucket should still contain them
|
||||
require.Len(t, bucket.items, 3, "Bucket should still have 3 items after IsStale check")
|
||||
require.Len(t, bucket.index, 3, "Bucket index should still have 3 items after IsStale check")
|
||||
})
|
||||
|
||||
t.Run("IsStale returns false when latest item is not expired", func(t *testing.T) {
|
||||
bucket := NewBucket[model.Fingerprint](3)
|
||||
|
||||
// Add mix of expired and non-expired alerts
|
||||
expiredTime := time.Now().Add(-1 * time.Hour)
|
||||
futureTime := time.Now().Add(1 * time.Hour)
|
||||
|
||||
alert1 := model.Alert{Labels: model.LabelSet{"alertname": "Expired1"}, EndsAt: expiredTime}
|
||||
alert2 := model.Alert{Labels: model.LabelSet{"alertname": "Expired2"}, EndsAt: expiredTime.Add(-10 * time.Minute)}
|
||||
alert3 := model.Alert{Labels: model.LabelSet{"alertname": "Active"}, EndsAt: futureTime}
|
||||
|
||||
bucket.Upsert(alert1.Fingerprint(), alert1.EndsAt)
|
||||
bucket.Upsert(alert2.Fingerprint(), alert2.EndsAt)
|
||||
bucket.Upsert(alert3.Fingerprint(), alert3.EndsAt)
|
||||
|
||||
require.Len(t, bucket.items, 3, "Bucket should have 3 items before IsStale check")
|
||||
|
||||
// IsStale should return false since the latest item (alert3) is not expired
|
||||
stale := bucket.IsStale()
|
||||
|
||||
require.False(t, stale, "IsStale should return false when latest item is not expired")
|
||||
require.Len(t, bucket.items, 3, "Bucket should still have 3 items after IsStale check")
|
||||
require.Len(t, bucket.index, 3, "Bucket index should still have 3 items after IsStale check")
|
||||
})
|
||||
}
|
||||
|
||||
// Benchmark concurrent access to Bucket.Upsert().
|
||||
func BenchmarkBucketUpsertConcurrent(b *testing.B) {
|
||||
bucket := NewBucket[model.Fingerprint](100)
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
alertCounter := 0
|
||||
for pb.Next() {
|
||||
alert := model.Alert{
|
||||
Labels: model.LabelSet{"alertname": model.LabelValue("Alert" + string(rune(alertCounter))), "instance": "server1"},
|
||||
EndsAt: time.Now().Add(1 * time.Hour),
|
||||
}
|
||||
bucket.Upsert(alert.Fingerprint(), alert.EndsAt)
|
||||
alertCounter++
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -15,6 +15,7 @@ package mem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
@@ -52,7 +54,10 @@ type Alerts struct {
|
||||
|
||||
logger *slog.Logger
|
||||
propagator propagation.TextMapPropagator
|
||||
flagger featurecontrol.Flagger
|
||||
|
||||
alertsLimit prometheus.Gauge
|
||||
alertsLimitedTotal *prometheus.CounterVec
|
||||
subscriberChannelWrites *prometheus.CounterVec
|
||||
}
|
||||
|
||||
@@ -79,6 +84,23 @@ type listeningAlerts struct {
|
||||
func (a *Alerts) registerMetrics(r prometheus.Registerer) {
|
||||
r.MustRegister(&alertsCollector{alerts: a})
|
||||
|
||||
a.alertsLimit = promauto.With(r).NewGauge(prometheus.GaugeOpts{
|
||||
Name: "alertmanager_alerts_per_alert_limit",
|
||||
Help: "Current limit on number of alerts per alert name",
|
||||
})
|
||||
|
||||
labels := []string{}
|
||||
if a.flagger.EnableAlertNamesInMetrics() {
|
||||
labels = append(labels, "alertname")
|
||||
}
|
||||
a.alertsLimitedTotal = promauto.With(r).NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "alertmanager_alerts_limited_total",
|
||||
Help: "Total number of alerts that were dropped due to per alert name limit",
|
||||
},
|
||||
labels,
|
||||
)
|
||||
|
||||
a.subscriberChannelWrites = promauto.With(r).NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "alertmanager_alerts_subscriber_channel_writes_total",
|
||||
@@ -89,25 +111,44 @@ func (a *Alerts) registerMetrics(r prometheus.Registerer) {
|
||||
}
|
||||
|
||||
// NewAlerts returns a new alert provider.
|
||||
func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duration, alertCallback AlertStoreCallback, l *slog.Logger, r prometheus.Registerer) (*Alerts, error) {
|
||||
func NewAlerts(
|
||||
ctx context.Context,
|
||||
m types.AlertMarker,
|
||||
intervalGC time.Duration,
|
||||
perAlertNameLimit int,
|
||||
alertCallback AlertStoreCallback,
|
||||
l *slog.Logger,
|
||||
r prometheus.Registerer,
|
||||
flagger featurecontrol.Flagger,
|
||||
) (*Alerts, error) {
|
||||
if alertCallback == nil {
|
||||
alertCallback = noopCallback{}
|
||||
}
|
||||
|
||||
if perAlertNameLimit > 0 {
|
||||
l.Info("per alert name limit enabled", "limit", perAlertNameLimit)
|
||||
}
|
||||
|
||||
if flagger == nil {
|
||||
flagger = featurecontrol.NoopFlags{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
a := &Alerts{
|
||||
marker: m,
|
||||
alerts: store.NewAlerts(),
|
||||
alerts: store.NewAlerts().WithPerAlertLimit(perAlertNameLimit),
|
||||
cancel: cancel,
|
||||
listeners: map[int]listeningAlerts{},
|
||||
next: 0,
|
||||
logger: l.With("component", "provider"),
|
||||
propagator: otel.GetTextMapPropagator(),
|
||||
callback: alertCallback,
|
||||
flagger: flagger,
|
||||
}
|
||||
|
||||
if r != nil {
|
||||
a.registerMetrics(r)
|
||||
a.alertsLimit.Set(float64(perAlertNameLimit))
|
||||
}
|
||||
|
||||
go a.gcLoop(ctx, intervalGC)
|
||||
@@ -271,7 +312,14 @@ func (a *Alerts) Put(ctx context.Context, alerts ...*types.Alert) error {
|
||||
}
|
||||
|
||||
if err := a.alerts.Set(alert); err != nil {
|
||||
a.logger.Error("error on set alert", "err", err)
|
||||
a.logger.Warn("error on set alert", "alertname", alert.Name(), "err", err)
|
||||
if errors.Is(err, store.ErrLimited) {
|
||||
labels := []string{}
|
||||
if a.flagger.EnableAlertNamesInMetrics() {
|
||||
labels = append(labels, alert.Name())
|
||||
}
|
||||
a.alertsLimitedTotal.WithLabelValues(labels...).Inc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ var (
|
||||
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
|
||||
func TestAlertsSubscribePutStarvation(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -137,7 +137,7 @@ func TestDeadLock(t *testing.T) {
|
||||
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
// Run gc every 5 milliseconds to increase the possibility of a deadlock with Subscribe()
|
||||
alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 5*time.Millisecond, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -190,7 +190,7 @@ func TestDeadLock(t *testing.T) {
|
||||
|
||||
func TestAlertsPut(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -214,7 +214,7 @@ func TestAlertsSubscribe(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
|
||||
ctx := t.Context()
|
||||
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(ctx, marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), prometheus.NewRegistry(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -291,7 +291,7 @@ func TestAlertsSubscribe(t *testing.T) {
|
||||
|
||||
func TestAlertsGetPending(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), nil)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -329,7 +329,7 @@ func TestAlertsGetPending(t *testing.T) {
|
||||
|
||||
func TestAlertsGC(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, promslog.NewNopLogger(), nil)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, noopCallback{}, promslog.NewNopLogger(), nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -366,7 +366,7 @@ func TestAlertsStoreCallback(t *testing.T) {
|
||||
cb := &limitCountCallback{limit: 3}
|
||||
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, promslog.NewNopLogger(), nil)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, cb, promslog.NewNopLogger(), nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -427,7 +427,7 @@ func TestAlertsStoreCallback(t *testing.T) {
|
||||
|
||||
func TestAlerts_CountByState(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, nil, promslog.NewNopLogger(), nil)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, 0, nil, promslog.NewNopLogger(), nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
countTotal := func() int {
|
||||
@@ -546,7 +546,7 @@ func (l *limitCountCallback) PostDelete(_ *types.Alert) {
|
||||
|
||||
func TestAlertsConcurrently(t *testing.T) {
|
||||
callback := &limitCountCallback{limit: 100}
|
||||
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, callback, promslog.NewNopLogger(), nil)
|
||||
a, err := NewAlerts(context.Background(), types.NewMarker(prometheus.NewRegistry()), time.Millisecond, 0, callback, promslog.NewNopLogger(), nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
stopc := make(chan struct{})
|
||||
@@ -607,7 +607,7 @@ func TestAlertsConcurrently(t *testing.T) {
|
||||
func TestSubscriberChannelMetrics(t *testing.T) {
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
reg := prometheus.NewRegistry()
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, promslog.NewNopLogger(), reg)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, 0, noopCallback{}, promslog.NewNopLogger(), reg, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
subscriberName := "test_subscriber"
|
||||
|
||||
@@ -21,9 +21,13 @@ import (
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/limit"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
// ErrLimited is returned if a Store has reached the per-alert limit.
|
||||
var ErrLimited = errors.New("alert limited")
|
||||
|
||||
// ErrNotFound is returned if a Store cannot find the Alert.
|
||||
var ErrNotFound = errors.New("alert not found")
|
||||
|
||||
@@ -33,26 +37,40 @@ var ErrNotFound = errors.New("alert not found")
|
||||
// resolved alerts that have been removed.
|
||||
type Alerts struct {
|
||||
sync.Mutex
|
||||
c map[model.Fingerprint]*types.Alert
|
||||
cb func([]types.Alert)
|
||||
alerts map[model.Fingerprint]*types.Alert
|
||||
gcCallback func([]types.Alert)
|
||||
limits map[string]*limit.Bucket[model.Fingerprint]
|
||||
perAlertLimit int
|
||||
}
|
||||
|
||||
// NewAlerts returns a new Alerts struct.
|
||||
func NewAlerts() *Alerts {
|
||||
a := &Alerts{
|
||||
c: make(map[model.Fingerprint]*types.Alert),
|
||||
cb: func(_ []types.Alert) {},
|
||||
alerts: make(map[model.Fingerprint]*types.Alert),
|
||||
gcCallback: func(_ []types.Alert) {},
|
||||
perAlertLimit: 0,
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// WithPerAlertLimit sets the per-alert limit for the Alerts struct.
|
||||
func (a *Alerts) WithPerAlertLimit(lim int) *Alerts {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.limits = make(map[string]*limit.Bucket[model.Fingerprint])
|
||||
a.perAlertLimit = lim
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// SetGCCallback sets a GC callback to be executed after each GC.
|
||||
func (a *Alerts) SetGCCallback(cb func([]types.Alert)) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.cb = cb
|
||||
a.gcCallback = cb
|
||||
}
|
||||
|
||||
// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic.
|
||||
@@ -73,9 +91,9 @@ func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
|
||||
func (a *Alerts) GC() []types.Alert {
|
||||
a.Lock()
|
||||
var resolved []types.Alert
|
||||
for fp, alert := range a.c {
|
||||
for fp, alert := range a.alerts {
|
||||
if alert.Resolved() {
|
||||
delete(a.c, fp)
|
||||
delete(a.alerts, fp)
|
||||
resolved = append(resolved, types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: alert.Labels.Clone(),
|
||||
@@ -89,8 +107,16 @@ func (a *Alerts) GC() []types.Alert {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Remove stale alert limit buckets
|
||||
for alertName, bucket := range a.limits {
|
||||
if bucket.IsStale() {
|
||||
delete(a.limits, alertName)
|
||||
}
|
||||
}
|
||||
|
||||
a.Unlock()
|
||||
a.cb(resolved)
|
||||
a.gcCallback(resolved)
|
||||
return resolved
|
||||
}
|
||||
|
||||
@@ -100,7 +126,7 @@ func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
alert, prs := a.c[fp]
|
||||
alert, prs := a.alerts[fp]
|
||||
if !prs {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
@@ -112,7 +138,22 @@ func (a *Alerts) Set(alert *types.Alert) error {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.c[alert.Fingerprint()] = alert
|
||||
fp := alert.Fingerprint()
|
||||
name := alert.Name()
|
||||
|
||||
// Apply per alert limits if necessary
|
||||
if a.perAlertLimit > 0 {
|
||||
bucket, ok := a.limits[name]
|
||||
if !ok {
|
||||
bucket = limit.NewBucket[model.Fingerprint](a.perAlertLimit)
|
||||
a.limits[name] = bucket
|
||||
}
|
||||
if !bucket.Upsert(fp, alert.EndsAt) {
|
||||
return ErrLimited
|
||||
}
|
||||
}
|
||||
|
||||
a.alerts[fp] = alert
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -123,8 +164,8 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
|
||||
defer a.Unlock()
|
||||
for _, alert := range alerts {
|
||||
fp := alert.Fingerprint()
|
||||
if other, ok := a.c[fp]; ok && alert.UpdatedAt.Equal(other.UpdatedAt) {
|
||||
delete(a.c, fp)
|
||||
if other, ok := a.alerts[fp]; ok && alert.UpdatedAt.Equal(other.UpdatedAt) {
|
||||
delete(a.alerts, fp)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -135,8 +176,8 @@ func (a *Alerts) List() []*types.Alert {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
alerts := make([]*types.Alert, 0, len(a.c))
|
||||
for _, alert := range a.c {
|
||||
alerts := make([]*types.Alert, 0, len(a.alerts))
|
||||
for _, alert := range a.alerts {
|
||||
alerts = append(alerts, alert)
|
||||
}
|
||||
|
||||
@@ -148,7 +189,7 @@ func (a *Alerts) Empty() bool {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
return len(a.c) == 0
|
||||
return len(a.alerts) == 0
|
||||
}
|
||||
|
||||
// Len returns the number of alerts in the store.
|
||||
@@ -156,5 +197,5 @@ func (a *Alerts) Len() int {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
return len(a.c)
|
||||
return len(a.alerts)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user