From 2e0970e8d8191f6fb7df6fb625f236bad5aa759a Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Sat, 15 Nov 2025 15:35:59 +0100 Subject: [PATCH] feat(dispatch): add start delay (#4704) This change adds a new cmd flag `--dispatch.start-delay` which corresponds to the `--rules.alert.resend-delay` flag in Prometheus. This flag controls the minimum amount of time that Prometheus waits before resending an alert to Alertmanager. By adding this value to the start time of Alertmanager, we delay the aggregation groups' first flush, until we are confident all alerts are resent by Prometheus instances. This should help avoid race conditions in inhibitions after a (re)start. Other improvements: - remove hasFlushed flag from aggrGroup - remove mutex locking from aggrGroup Signed-off-by: Alexander Rickardsson Signed-off-by: Siavash Safi Co-authored-by: Alexander Rickardsson --- cmd/alertmanager/main.go | 17 ++- dispatch/dispatch.go | 128 ++++++++++++++++------- dispatch/dispatch_test.go | 123 ++++++++++++++++++---- test/with_api_v2/acceptance.go | 12 +-- test/with_api_v2/acceptance/send_test.go | 62 +++++++++++ 5 files changed, 277 insertions(+), 65 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 3e0bb994e..567f856b3 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -143,6 +143,7 @@ func run() int { maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() + DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() @@ -186,6 +187,8 @@ func run() int { logger := promslog.New(&promslogConfig) logger.Info("Starting Alertmanager", "version", version.Info()) + startTime := time.Now() + logger.Info("Build context", "build_context", version.BuildContext()) ff, err := featurecontrol.NewFlags(logger, *featureFlags) @@ -493,7 +496,17 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher( + alerts, + routes, + pipeline, + marker, + timeoutFunc, + *dispatchMaintenanceInterval, + nil, + logger, + dispMetrics, + ) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { configLogger.Warn( @@ -520,7 +533,7 @@ func run() int { } }) - go disp.Run() + go disp.Run(startTime.Add(*DispatchStartDelay)) go inhibitor.Run() return nil diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d1d7a39eb..1fae2d9a0 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,6 +20,7 @@ import ( "log/slog" "sort" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -32,6 +33,12 @@ import ( "github.com/prometheus/alertmanager/types" ) +const ( + DispatcherStateUnknown = iota + DispatcherStateWaitingToStart + DispatcherStateRunning +) + // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { aggrGroups prometheus.Gauge @@ -90,6 +97,9 @@ type Dispatcher struct { cancel func() logger *slog.Logger + + startTimer *time.Timer + state int } // Limits describes limits used by Dispatcher. @@ -102,39 +112,44 @@ type Limits interface { // NewDispatcher returns a new Dispatcher. func NewDispatcher( - ap provider.Alerts, - r *Route, - s notify.Stage, - mk types.GroupMarker, - to func(time.Duration) time.Duration, - mi time.Duration, - lim Limits, - l *slog.Logger, - m *DispatcherMetrics, + alerts provider.Alerts, + route *Route, + stage notify.Stage, + marker types.GroupMarker, + timeout func(time.Duration) time.Duration, + maintenanceInterval time.Duration, + limits Limits, + logger *slog.Logger, + metrics *DispatcherMetrics, ) *Dispatcher { - if lim == nil { - lim = nilLimits{} + if limits == nil { + limits = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - marker: mk, - timeout: to, - maintenanceInterval: mi, - logger: l.With("component", "dispatcher"), - metrics: m, - limits: lim, + alerts: alerts, + stage: stage, + route: route, + marker: marker, + timeout: timeout, + maintenanceInterval: maintenanceInterval, + logger: logger.With("component", "dispatcher"), + metrics: metrics, + limits: limits, + state: DispatcherStateUnknown, } return disp } // Run starts dispatching alerts incoming via the updates channel. -func (d *Dispatcher) Run() { +func (d *Dispatcher) Run(dispatchStartTime time.Time) { d.done = make(chan struct{}) d.mtx.Lock() + d.logger.Debug("preparing to start", "startTime", dispatchStartTime) + d.startTimer = time.NewTimer(time.Until(dispatchStartTime)) + d.state = DispatcherStateWaitingToStart + d.logger.Debug("setting state", "state", "waiting_to_start") d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) @@ -176,6 +191,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + case <-d.startTimer.C: + if d.state == DispatcherStateWaitingToStart { + d.state = DispatcherStateRunning + d.logger.Debug("started", "state", "running") + d.logger.Debug("Starting all existing aggregation groups") + for _, groups := range d.aggrGroupsPerRoute { + for _, ag := range groups { + d.runAG(ag) + } + } + } + case <-maintenance.C: d.doMaintenance() case <-d.ctx.Done(): @@ -311,6 +338,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool // processAlert determines in which aggregation group the alert falls // and inserts it. func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { + now := time.Now() groupLabels := getGroupLabels(alert, route) fp := groupLabels.Fingerprint() @@ -347,6 +375,30 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // alert is already there. ag.insert(alert) + if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) { + ag.logger.Debug( + "Alert is old enough for immediate flush, resetting timer to zero", + "alert", alert.Name(), + "fingerprint", alert.Fingerprint(), + "startsAt", alert.StartsAt, + ) + ag.resetTimer(0) + } + // Check dispatcher and alert state to determine if we should run the AG now. + switch d.state { + case DispatcherStateWaitingToStart: + d.logger.Debug("Dispatcher still waiting to start") + case DispatcherStateRunning: + d.runAG(ag) + default: + d.logger.Warn("unknown state detected", "state", "unknown") + } +} + +func (d *Dispatcher) runAG(ag *aggrGroup) { + if ag.running.Load() { + return + } go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { @@ -392,13 +444,18 @@ type aggrGroup struct { done chan struct{} next *time.Timer timeout func(time.Duration) time.Duration - - mtx sync.RWMutex - hasFlushed bool + running atomic.Bool } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup { +func newAggrGroup( + ctx context.Context, + labels model.LabelSet, + r *Route, + to func(time.Duration) time.Duration, + marker types.AlertMarker, + logger *slog.Logger, +) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } @@ -436,6 +493,7 @@ func (ag *aggrGroup) String() string { } func (ag *aggrGroup) run(nf notifyFunc) { + ag.running.Store(true) defer close(ag.done) defer ag.next.Stop() @@ -462,10 +520,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. - ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupInterval) - ag.hasFlushed = true - ag.mtx.Unlock() + ag.resetTimer(ag.opts.GroupInterval) ag.flush(func(alerts ...*types.Alert) bool { return nf(ctx, alerts...) @@ -486,19 +541,16 @@ func (ag *aggrGroup) stop() { <-ag.done } +// resetTimer resets the timer for the AG. +func (ag *aggrGroup) resetTimer(t time.Duration) { + ag.next.Reset(t) +} + // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { ag.logger.Error("error on set alert", "err", err) } - - // Immediately trigger a flush if the wait duration for this - // alert is already over. - ag.mtx.Lock() - defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { - ag.next.Reset(0) - } } func (ag *aggrGroup) empty() bool { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index fe4df6d2e..c14b9eba4 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -191,8 +191,6 @@ func TestAggrGroup(t *testing.T) { ag.stop() - // Add an alert that started more than group_interval in the past. We expect - // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) @@ -201,18 +199,12 @@ func TestAggrGroup(t *testing.T) { ag.insert(a1) ag.insert(a2) - // a2 lies way in the past so the initial group_wait should be skipped. - select { - case <-time.After(opts.GroupWait / 2): - t.Fatalf("expected immediate alert but received none") + batch := <-alertsCh + exp := removeEndsAt(types.AlertSlice{a1, a2}) + sort.Sort(batch) - case batch := <-alertsCh: - exp := removeEndsAt(types.AlertSlice{a1, a2}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) } for i := 0; i < 3; i++ { @@ -243,7 +235,7 @@ func TestAggrGroup(t *testing.T) { a1r := *a1 a1r.EndsAt = time.Now() ag.insert(&a1r) - exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) select { case <-time.After(2 * opts.GroupInterval): @@ -403,7 +395,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -556,7 +548,7 @@ route: lim := limits{groups: 6} m := NewDispatcherMetrics(true, reg) dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, lim, logger, m) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -675,7 +667,7 @@ func TestDispatcherRace(t *testing.T) { timeout := func(d time.Duration) time.Duration { return time.Duration(0) } dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) dispatcher.Stop() } @@ -704,7 +696,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Push all alerts. @@ -973,3 +965,96 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) { require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush") }) } + +func TestDispatchOnStartup(t *testing.T) { + logger := promslog.NewNopLogger() + reg := prometheus.NewRegistry() + marker := types.NewMarker(reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + // Set up a route with GroupBy to separate alerts into different aggregation groups. + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{"instance": {}}, + GroupWait: 1 * time.Second, + GroupInterval: 3 * time.Minute, + RepeatInterval: 1 * time.Hour, + }, + } + + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + timeout := func(d time.Duration) time.Duration { return d } + + // Set start time to 3 seconds in the future + now := time.Now() + startDelay := 2 * time.Second + startTime := time.Now().Add(startDelay) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) + go dispatcher.Run(startTime) + defer dispatcher.Stop() + + // Create 2 similar alerts with start times in the past + alert1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert1", "instance": "1"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + alert2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert2", "instance": "2"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + // Send alert1 + require.NoError(t, alerts.Put(alert1)) + + var recordedAlerts []*types.Alert + // Expect a recorded alert after startTime + GroupWait which is in future + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 1 + }, startDelay+route.RouteOpts.GroupWait, 500*time.Millisecond) + + require.Equal(t, alert1.Fingerprint(), recordedAlerts[0].Fingerprint(), "expected alert1 to be dispatched after GroupWait") + + // Send alert2 + require.NoError(t, alerts.Put(alert2)) + + // Expect a recorded alert after GroupInterval + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 2 + }, route.RouteOpts.GroupInterval, 100*time.Millisecond) + + // Sort alerts by fingerprint for deterministic ordering + sort.Slice(recordedAlerts, func(i, j int) bool { + return recordedAlerts[i].Fingerprint() < recordedAlerts[j].Fingerprint() + }) + require.Equal(t, alert2.Fingerprint(), recordedAlerts[1].Fingerprint(), "expected alert2 to be dispatched after GroupInterval") + + // Verify both alerts are present + fingerprints := make(map[model.Fingerprint]bool) + for _, a := range recordedAlerts { + fingerprints[a.Fingerprint()] = true + } + require.True(t, fingerprints[alert1.Fingerprint()], "expected alert1 to be present") + require.True(t, fingerprints[alert2.Fingerprint()], "expected alert2 to be present") +} diff --git a/test/with_api_v2/acceptance.go b/test/with_api_v2/acceptance.go index 4a1f824b7..60878d124 100644 --- a/test/with_api_v2/acceptance.go +++ b/test/with_api_v2/acceptance.go @@ -164,7 +164,7 @@ func (t *AcceptanceTest) Collector(name string) *Collector { // Run starts all Alertmanagers and runs queries against them. It then checks // whether all expected notifications have arrived at the expected receiver. -func (t *AcceptanceTest) Run() { +func (t *AcceptanceTest) Run(additionalArgs ...string) { errc := make(chan error) for _, am := range t.amc.ams { @@ -173,7 +173,7 @@ func (t *AcceptanceTest) Run() { t.Cleanup(am.cleanup) } - err := t.amc.Start() + err := t.amc.Start(additionalArgs...) if err != nil { t.Log(err) t.Fail() @@ -263,14 +263,14 @@ type AlertmanagerCluster struct { } // Start the Alertmanager cluster and wait until it is ready to receive. -func (amc *AlertmanagerCluster) Start() error { - var peerFlags []string +func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error { + args := additionalArgs for _, am := range amc.ams { - peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr) + args = append(args, "--cluster.peer="+am.clusterAddr) } for _, am := range amc.ams { - err := am.Start(peerFlags) + err := am.Start(args) if err != nil { return fmt.Errorf("failed to start alertmanager cluster: %w", err) } diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index fa190f44c..167829932 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -419,6 +419,68 @@ receivers: } } +func TestColdStart(t *testing.T) { + t.Parallel() + + // This integration test ensures that the first alert isn't notified before + // the AlertManager process has started considering the resend delay. + conf := ` +route: + receiver: "default" + group_by: [] + group_wait: 1s + group_interval: 6s + repeat_interval: 10m + +receivers: +- name: "default" + webhook_configs: + - url: 'http://%s' +` + + at := NewAcceptanceTest(t, &AcceptanceOpts{ + Tolerance: 150 * time.Millisecond, + }) + + co := at.Collector("webhook") + wh := NewWebhook(t, co) + + amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1) + + amc.Push(At(1), Alert("alertname", "test1").Active(-100)) + amc.Push(At(2), Alert("alertname", "test2")) + + // Alerts are dispatched 5 seconds after the AlertManager process has started. + // start delay: 5s + // first alert received at: 1s + // first alert dispatched at: 5s - 1s = 4s + co.Want(Between(4, 5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + ) + + // Reload AlertManager process. + at.Do(At(5), amc.Reload) + + amc.Push(At(6), Alert("alertname", "test3").Active(-100)) + amc.Push(At(7), Alert("alertname", "test4")) + + // Group interval is applied on top of start delay. + // start delay: 5s + // group interval: 6s + // alerts dispatched at: 5s + 6s = 11s + co.Want(Between(11, 11.5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + Alert("alertname", "test3").Active(6), + Alert("alertname", "test4").Active(7), + ) + + at.Run("--dispatch.start-delay", "5s") + + t.Log(co.Check()) +} + func TestReload(t *testing.T) { t.Parallel()