1
0
mirror of https://github.com/prometheus/alertmanager.git synced 2026-02-05 15:45:34 +01:00
Files
alertmanager/provider/mem/mem.go
Siavash Safi c90d8707d5 feat(provider): implement per-alert limits (#4819)
* feat(limit): add new limit package with bucket

Add a new limit package with generic bucket implementation.
This can be used for example to limit the number of alerts in memory.

Benchmarks:
```go
goos: darwin
goarch: arm64
pkg: github.com/prometheus/alertmanager/limit
cpu: Apple M3 Pro
BenchmarkBucketUpsert/EmptyBucket-12  	 8816954	       122.4 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsert/AddToFullBucketWithExpiredItems-12         	 9861010	       123.0 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsert/AddToFullBucketWithActiveItems-12          	 8343778	       143.6 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsert/UpdateExistingAlert-12                     	10107787	       118.9 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsert/MixedWorkload-12                           	 9436174	       126.0 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertScaling/BucketSize_10-12                    	10255278	       115.4 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertScaling/BucketSize_50-12                    	10166518	       117.1 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertScaling/BucketSize_100-12                   	10457394	       115.0 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertScaling/BucketSize_500-12                   	 9644079	       115.2 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertScaling/BucketSize_1000-12                  	10426184	       116.6 ns/op	      56 B/op	       2 allocs/op
BenchmarkBucketUpsertConcurrent-12                               	 5796210	       216.3 ns/op	     406 B/op	       5 allocs/op
PASS
ok  	github.com/prometheus/alertmanager/limit	15.497s
```

Signed-off-by: Siavash Safi <siavash@cloudflare.com>

* feat(provider): implement per-alert limits

Use the new limit module to add optional per alert-name limits.
The metrics for limited alerts can be enabled using
`alerts-limited-metric` feature flag.

Signed-off-by: Siavash Safi <siavash@cloudflare.com>

---------

Signed-off-by: Siavash Safi <siavash@cloudflare.com>
2026-02-01 13:21:20 +01:00

394 lines
10 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mem
import (
"context"
"errors"
"log/slog"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)
const alertChannelLength = 200
var tracer = otel.Tracer("github.com/prometheus/alertmanager/provider/mem")
// Alerts gives access to a set of alerts. All methods are goroutine-safe.
type Alerts struct {
cancel context.CancelFunc
mtx sync.Mutex
alerts *store.Alerts
marker types.AlertMarker
listeners map[int]listeningAlerts
next int
callback AlertStoreCallback
logger *slog.Logger
propagator propagation.TextMapPropagator
flagger featurecontrol.Flagger
alertsLimit prometheus.Gauge
alertsLimitedTotal *prometheus.CounterVec
subscriberChannelWrites *prometheus.CounterVec
}
type AlertStoreCallback interface {
// PreStore is called before alert is stored into the store. If this method returns error,
// alert is not stored.
// Existing flag indicates whether alert has existed before (and is only updated) or not.
// If alert has existed before, then alert passed to PreStore is result of merging existing alert with new alert.
PreStore(alert *types.Alert, existing bool) error
// PostStore is called after alert has been put into store.
PostStore(alert *types.Alert, existing bool)
// PostDelete is called after alert has been removed from the store due to alert garbage collection.
PostDelete(alert *types.Alert)
}
type listeningAlerts struct {
name string
alerts chan *provider.Alert
done chan struct{}
}
func (a *Alerts) registerMetrics(r prometheus.Registerer) {
r.MustRegister(&alertsCollector{alerts: a})
a.alertsLimit = promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_alerts_per_alert_limit",
Help: "Current limit on number of alerts per alert name",
})
labels := []string{}
if a.flagger.EnableAlertNamesInMetrics() {
labels = append(labels, "alertname")
}
a.alertsLimitedTotal = promauto.With(r).NewCounterVec(
prometheus.CounterOpts{
Name: "alertmanager_alerts_limited_total",
Help: "Total number of alerts that were dropped due to per alert name limit",
},
labels,
)
a.subscriberChannelWrites = promauto.With(r).NewCounterVec(
prometheus.CounterOpts{
Name: "alertmanager_alerts_subscriber_channel_writes_total",
Help: "Number of times alerts were written to subscriber channels",
},
[]string{"subscriber"},
)
}
// NewAlerts returns a new alert provider.
func NewAlerts(
ctx context.Context,
m types.AlertMarker,
intervalGC time.Duration,
perAlertNameLimit int,
alertCallback AlertStoreCallback,
l *slog.Logger,
r prometheus.Registerer,
flagger featurecontrol.Flagger,
) (*Alerts, error) {
if alertCallback == nil {
alertCallback = noopCallback{}
}
if perAlertNameLimit > 0 {
l.Info("per alert name limit enabled", "limit", perAlertNameLimit)
}
if flagger == nil {
flagger = featurecontrol.NoopFlags{}
}
ctx, cancel := context.WithCancel(ctx)
a := &Alerts{
marker: m,
alerts: store.NewAlerts().WithPerAlertLimit(perAlertNameLimit),
cancel: cancel,
listeners: map[int]listeningAlerts{},
next: 0,
logger: l.With("component", "provider"),
propagator: otel.GetTextMapPropagator(),
callback: alertCallback,
flagger: flagger,
}
if r != nil {
a.registerMetrics(r)
a.alertsLimit.Set(float64(perAlertNameLimit))
}
go a.gcLoop(ctx, intervalGC)
return a, nil
}
func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
}
}
}
func (a *Alerts) gc() {
a.mtx.Lock()
defer a.mtx.Unlock()
deleted := a.alerts.GC()
for _, alert := range deleted {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert)
}
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
}
// Close the alert provider.
func (a *Alerts) Close() {
if a.cancel != nil {
a.cancel()
}
}
// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
func (a *Alerts) Subscribe(name string) provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *provider.Alert, max(len(alerts), alertChannelLength))
)
for _, a := range alerts {
ch <- &provider.Alert{
Header: map[string]string{},
Data: a,
}
}
a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done}
a.next++
return provider.NewAlertIterator(ch, done, nil)
}
func (a *Alerts) SlurpAndSubscribe(name string) ([]*types.Alert, provider.AlertIterator) {
a.mtx.Lock()
defer a.mtx.Unlock()
var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *provider.Alert, alertChannelLength)
)
a.listeners[a.next] = listeningAlerts{name: name, alerts: ch, done: done}
a.next++
return alerts, provider.NewAlertIterator(ch, done, nil)
}
// GetPending returns an iterator over all the alerts that have
// pending notifications.
func (a *Alerts) GetPending() provider.AlertIterator {
var (
ch = make(chan *provider.Alert, alertChannelLength)
done = make(chan struct{})
)
a.mtx.Lock()
defer a.mtx.Unlock()
alerts := a.alerts.List()
go func() {
defer close(ch)
for _, a := range alerts {
select {
case ch <- &provider.Alert{
Header: map[string]string{},
Data: a,
}:
case <-done:
return
}
}
}()
return provider.NewAlertIterator(ch, done, nil)
}
// Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.alerts.Get(fp)
}
// Put adds the given alert to the set.
func (a *Alerts) Put(ctx context.Context, alerts ...*types.Alert) error {
a.mtx.Lock()
defer a.mtx.Unlock()
ctx, span := tracer.Start(ctx, "provider.mem.Put",
trace.WithAttributes(
attribute.Int("alerting.alerts.count", len(alerts)),
),
trace.WithSpanKind(trace.SpanKindProducer),
)
defer span.End()
for _, alert := range alerts {
fp := alert.Fingerprint()
existing := false
// Check that there's an alert existing within the store before
// trying to merge.
if old, err := a.alerts.Get(fp); err == nil {
existing = true
// Merge alerts if there is an overlap in activity range.
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}
if err := a.callback.PreStore(alert, existing); err != nil {
a.logger.Error("pre-store callback returned error on set alert", "err", err)
continue
}
if err := a.alerts.Set(alert); err != nil {
a.logger.Warn("error on set alert", "alertname", alert.Name(), "err", err)
if errors.Is(err, store.ErrLimited) {
labels := []string{}
if a.flagger.EnableAlertNamesInMetrics() {
labels = append(labels, alert.Name())
}
a.alertsLimitedTotal.WithLabelValues(labels...).Inc()
}
continue
}
a.callback.PostStore(alert, existing)
metadata := map[string]string{}
a.propagator.Inject(ctx, propagation.MapCarrier(metadata))
msg := &provider.Alert{
Data: alert,
Header: metadata,
}
for _, l := range a.listeners {
select {
case l.alerts <- msg:
a.subscriberChannelWrites.WithLabelValues(l.name).Inc()
case <-l.done:
}
}
}
return nil
}
// countByState returns the number of non-resolved alerts by state.
func (a *Alerts) countByState() (active, suppressed, unprocessed int) {
for _, alert := range a.alerts.List() {
if alert.Resolved() {
continue
}
switch a.marker.Status(alert.Fingerprint()).State {
case types.AlertStateActive:
active++
case types.AlertStateSuppressed:
suppressed++
case types.AlertStateUnprocessed:
unprocessed++
}
}
return active, suppressed, unprocessed
}
// alertsCollector implements prometheus.Collector to collect all alert count metrics in a single pass.
type alertsCollector struct {
alerts *Alerts
}
var alertsDesc = prometheus.NewDesc(
"alertmanager_alerts",
"How many alerts by state.",
[]string{"state"}, nil,
)
func (c *alertsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- alertsDesc
}
func (c *alertsCollector) Collect(ch chan<- prometheus.Metric) {
active, suppressed, unprocessed := c.alerts.countByState()
ch <- prometheus.MustNewConstMetric(alertsDesc, prometheus.GaugeValue, float64(active), string(types.AlertStateActive))
ch <- prometheus.MustNewConstMetric(alertsDesc, prometheus.GaugeValue, float64(suppressed), string(types.AlertStateSuppressed))
ch <- prometheus.MustNewConstMetric(alertsDesc, prometheus.GaugeValue, float64(unprocessed), string(types.AlertStateUnprocessed))
}
type noopCallback struct{}
func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
func (n noopCallback) PostStore(_ *types.Alert, _ bool) {}
func (n noopCallback) PostDelete(_ *types.Alert) {}