1
0
mirror of https://github.com/prometheus/alertmanager.git synced 2026-02-05 15:45:34 +01:00
Files
alertmanager/dispatch/dispatch.go
Siavash Safi 2e0970e8d8 feat(dispatch): add start delay (#4704)
This change adds a new cmd flag `--dispatch.start-delay` which
corresponds to the `--rules.alert.resend-delay` flag in Prometheus.
This flag controls the minimum amount of time that Prometheus waits
before resending an alert to Alertmanager.

By adding this value to the start time of Alertmanager, we delay
the aggregation groups' first flush, until we are confident all alerts
are resent by Prometheus instances.

This should help avoid race conditions in inhibitions after a (re)start.

Other improvements:
- remove hasFlushed flag from aggrGroup
- remove mutex locking from aggrGroup

Signed-off-by: Alexander Rickardsson <alxric@aiven.io>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Co-authored-by: Alexander Rickardsson <alxric@aiven.io>
2025-11-15 15:35:59 +01:00

608 lines
16 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dispatch
import (
"context"
"errors"
"fmt"
"log/slog"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/store"
"github.com/prometheus/alertmanager/types"
)
const (
DispatcherStateUnknown = iota
DispatcherStateWaitingToStart
DispatcherStateRunning
)
// DispatcherMetrics represents metrics associated to a dispatcher.
type DispatcherMetrics struct {
aggrGroups prometheus.Gauge
processingDuration prometheus.Summary
aggrGroupLimitReached prometheus.Counter
}
// NewDispatcherMetrics returns a new registered DispatchMetrics.
func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
if r == nil {
return nil
}
m := DispatcherMetrics{
aggrGroups: promauto.With(r).NewGauge(
prometheus.GaugeOpts{
Name: "alertmanager_dispatcher_aggregation_groups",
Help: "Number of active aggregation groups",
},
),
processingDuration: promauto.With(r).NewSummary(
prometheus.SummaryOpts{
Name: "alertmanager_dispatcher_alert_processing_duration_seconds",
Help: "Summary of latencies for the processing of alerts.",
},
),
aggrGroupLimitReached: promauto.With(r).NewCounter(
prometheus.CounterOpts{
Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total",
Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
},
),
}
return &m
}
// Dispatcher sorts incoming alerts into aggregation groups and
// assigns the correct notifiers to each.
type Dispatcher struct {
route *Route
alerts provider.Alerts
stage notify.Stage
marker types.GroupMarker
metrics *DispatcherMetrics
limits Limits
timeout func(time.Duration) time.Duration
mtx sync.RWMutex
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
aggrGroupsNum int
maintenanceInterval time.Duration
done chan struct{}
ctx context.Context
cancel func()
logger *slog.Logger
startTimer *time.Timer
state int
}
// Limits describes limits used by Dispatcher.
type Limits interface {
// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
// 0 or negative value = unlimited.
// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
MaxNumberOfAggregationGroups() int
}
// NewDispatcher returns a new Dispatcher.
func NewDispatcher(
alerts provider.Alerts,
route *Route,
stage notify.Stage,
marker types.GroupMarker,
timeout func(time.Duration) time.Duration,
maintenanceInterval time.Duration,
limits Limits,
logger *slog.Logger,
metrics *DispatcherMetrics,
) *Dispatcher {
if limits == nil {
limits = nilLimits{}
}
disp := &Dispatcher{
alerts: alerts,
stage: stage,
route: route,
marker: marker,
timeout: timeout,
maintenanceInterval: maintenanceInterval,
logger: logger.With("component", "dispatcher"),
metrics: metrics,
limits: limits,
state: DispatcherStateUnknown,
}
return disp
}
// Run starts dispatching alerts incoming via the updates channel.
func (d *Dispatcher) Run(dispatchStartTime time.Time) {
d.done = make(chan struct{})
d.mtx.Lock()
d.logger.Debug("preparing to start", "startTime", dispatchStartTime)
d.startTimer = time.NewTimer(time.Until(dispatchStartTime))
d.state = DispatcherStateWaitingToStart
d.logger.Debug("setting state", "state", "waiting_to_start")
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe("dispatcher"))
close(d.done)
}
func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(d.maintenanceInterval)
defer maintenance.Stop()
defer it.Close()
for {
select {
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
}
return
}
d.logger.Debug("Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
continue
}
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-d.startTimer.C:
if d.state == DispatcherStateWaitingToStart {
d.state = DispatcherStateRunning
d.logger.Debug("started", "state", "running")
d.logger.Debug("Starting all existing aggregation groups")
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
d.runAG(ag)
}
}
}
case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}
func (d *Dispatcher) doMaintenance() {
d.mtx.Lock()
defer d.mtx.Unlock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
}
// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Labels model.LabelSet
Receiver string
GroupKey string
RouteID string
}
type AlertGroups []*AlertGroup
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag AlertGroups) Less(i, j int) bool {
if ag[i].Labels.Equal(ag[j].Labels) {
return ag[i].Receiver < ag[j].Receiver
}
return ag[i].Labels.Before(ag[j].Labels)
}
func (ag AlertGroups) Len() int { return len(ag) }
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
groups := AlertGroups{}
d.mtx.RLock()
defer d.mtx.RUnlock()
// Keep a list of receivers for an alert to prevent checking each alert
// again against all routes. The alert has already matched against this
// route on ingestion.
receivers := map[model.Fingerprint][]string{}
now := time.Now()
for route, ags := range d.aggrGroupsPerRoute {
if !routeFilter(route) {
continue
}
for _, ag := range ags {
receiver := route.RouteOpts.Receiver
alertGroup := &AlertGroup{
Labels: ag.labels,
Receiver: receiver,
GroupKey: ag.GroupKey(),
RouteID: ag.routeID,
}
alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts))
for _, a := range alerts {
if !alertFilter(a, now) {
continue
}
fp := a.Fingerprint()
if r, ok := receivers[fp]; ok {
// Receivers slice already exists. Add
// the current receiver to the slice.
receivers[fp] = append(r, receiver)
} else {
// First time we've seen this alert fingerprint.
// Initialize a new receivers slice.
receivers[fp] = []string{receiver}
}
filteredAlerts = append(filteredAlerts, a)
}
if len(filteredAlerts) == 0 {
continue
}
alertGroup.Alerts = filteredAlerts
groups = append(groups, alertGroup)
}
}
sort.Sort(groups)
for i := range groups {
sort.Sort(groups[i].Alerts)
}
for i := range receivers {
sort.Strings(receivers[i])
}
return groups, receivers
}
// Stop the dispatcher.
func (d *Dispatcher) Stop() {
if d == nil {
return
}
d.mtx.Lock()
if d.cancel == nil {
d.mtx.Unlock()
return
}
d.cancel()
d.cancel = nil
d.mtx.Unlock()
<-d.done
}
// notifyFunc is a function that performs notification for the alert
// with the given fingerprint. It aborts on context cancelation.
// Returns false iff notifying failed.
type notifyFunc func(context.Context, ...*types.Alert) bool
// processAlert determines in which aggregation group the alert falls
// and inserts it.
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
now := time.Now()
groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()
d.mtx.Lock()
defer d.mtx.Unlock()
routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}
ag, ok := routeGroups[fp]
if ok {
ag.insert(alert)
return
}
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)
if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) {
ag.logger.Debug(
"Alert is old enough for immediate flush, resetting timer to zero",
"alert", alert.Name(),
"fingerprint", alert.Fingerprint(),
"startsAt", alert.StartsAt,
)
ag.resetTimer(0)
}
// Check dispatcher and alert state to determine if we should run the AG now.
switch d.state {
case DispatcherStateWaitingToStart:
d.logger.Debug("Dispatcher still waiting to start")
case DispatcherStateRunning:
d.runAG(ag)
default:
d.logger.Warn("unknown state detected", "state", "unknown")
}
}
func (d *Dispatcher) runAG(ag *aggrGroup) {
if ag.running.Load() {
return
}
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("aggrGroup", ag.GroupKey(), "num_alerts", len(alerts), "err", err)
if errors.Is(ctx.Err(), context.Canceled) {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
logger.Debug("Notify for alerts failed")
} else {
logger.Error("Notify for alerts failed")
}
}
return err == nil
})
}
func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {
groupLabels := model.LabelSet{}
for ln, lv := range alert.Labels {
if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
groupLabels[ln] = lv
}
}
return groupLabels
}
// aggrGroup aggregates alert fingerprints into groups to which a
// common set of routing options applies.
// It emits notifications in the specified intervals.
type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
logger *slog.Logger
routeID string
routeKey string
alerts *store.Alerts
marker types.AlertMarker
ctx context.Context
cancel func()
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration
running atomic.Bool
}
// newAggrGroup returns a new aggregation group.
func newAggrGroup(
ctx context.Context,
labels model.LabelSet,
r *Route,
to func(time.Duration) time.Duration,
marker types.AlertMarker,
logger *slog.Logger,
) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
routeID: r.ID(),
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
marker: marker,
done: make(chan struct{}),
}
ag.ctx, ag.cancel = context.WithCancel(ctx)
ag.logger = logger.With("aggrGroup", ag)
// Set an initial one-time wait before flushing
// the first batch of notifications.
ag.next = time.NewTimer(ag.opts.GroupWait)
return ag
}
func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}
func (ag *aggrGroup) GroupKey() string {
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
}
func (ag *aggrGroup) String() string {
return ag.GroupKey()
}
func (ag *aggrGroup) run(nf notifyFunc) {
ag.running.Store(true)
defer close(ag.done)
defer ag.next.Stop()
for {
select {
case now := <-ag.next.C:
// Give the notifications time until the next flush to
// finish before terminating them.
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
// The now time we retrieve from the ticker is the only reliable
// point of time reference for the subsequent notification pipeline.
// Calculating the current time directly is prone to flaky behavior,
// which usually only becomes apparent in tests.
ctx = notify.WithNow(ctx, now)
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
// Wait the configured interval before calling flush again.
ag.resetTimer(ag.opts.GroupInterval)
ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})
cancel()
case <-ag.ctx.Done():
return
}
}
}
func (ag *aggrGroup) stop() {
// Calling cancel will terminate all in-process notifications
// and the run() loop.
ag.cancel()
<-ag.done
}
// resetTimer resets the timer for the AG.
func (ag *aggrGroup) resetTimer(t time.Duration) {
ag.next.Reset(t)
}
// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
ag.logger.Error("error on set alert", "err", err)
}
}
func (ag *aggrGroup) empty() bool {
return ag.alerts.Empty()
}
// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}
var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// Ensure that alerts don't resolve as time move forwards.
if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{}
}
alertsSlice = append(alertsSlice, &a)
}
sort.Stable(alertsSlice)
ag.logger.Debug("flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
if notify(alertsSlice...) {
// Delete all resolved alerts as we just sent a notification for them,
// and we don't want to send another one. However, we need to make sure
// that each resolved alert has not fired again during the flush as then
// we would delete an active alert thinking it was resolved.
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
ag.logger.Error("error on delete alerts", "err", err)
} else {
// Delete markers for resolved alerts that are not in the store.
for _, alert := range resolvedSlice {
_, err := ag.alerts.Get(alert.Fingerprint())
if errors.Is(err, store.ErrNotFound) {
ag.marker.Delete(alert.Fingerprint())
}
}
}
}
}
type nilLimits struct{}
func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }