diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 3e0bb994e..567f856b3 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -143,6 +143,7 @@ func run() int { maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() dispatchMaintenanceInterval = kingpin.Flag("dispatch.maintenance-interval", "Interval between maintenance of aggregation groups in the dispatcher.").Default("30s").Duration() + DispatchStartDelay = kingpin.Flag("dispatch.start-delay", "Minimum amount of time to wait before dispatching alerts. This option should be synced with value of --rules.alert.resend-delay on Prometheus.").Default("0s").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") externalURL = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String() @@ -186,6 +187,8 @@ func run() int { logger := promslog.New(&promslogConfig) logger.Info("Starting Alertmanager", "version", version.Info()) + startTime := time.Now() + logger.Info("Build context", "build_context", version.BuildContext()) ff, err := featurecontrol.NewFlags(logger, *featureFlags) @@ -493,7 +496,17 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, *dispatchMaintenanceInterval, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher( + alerts, + routes, + pipeline, + marker, + timeoutFunc, + *dispatchMaintenanceInterval, + nil, + logger, + dispMetrics, + ) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { configLogger.Warn( @@ -520,7 +533,7 @@ func run() int { } }) - go disp.Run() + go disp.Run(startTime.Add(*DispatchStartDelay)) go inhibitor.Run() return nil diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d1d7a39eb..1fae2d9a0 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,6 +20,7 @@ import ( "log/slog" "sort" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -32,6 +33,12 @@ import ( "github.com/prometheus/alertmanager/types" ) +const ( + DispatcherStateUnknown = iota + DispatcherStateWaitingToStart + DispatcherStateRunning +) + // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { aggrGroups prometheus.Gauge @@ -90,6 +97,9 @@ type Dispatcher struct { cancel func() logger *slog.Logger + + startTimer *time.Timer + state int } // Limits describes limits used by Dispatcher. @@ -102,39 +112,44 @@ type Limits interface { // NewDispatcher returns a new Dispatcher. func NewDispatcher( - ap provider.Alerts, - r *Route, - s notify.Stage, - mk types.GroupMarker, - to func(time.Duration) time.Duration, - mi time.Duration, - lim Limits, - l *slog.Logger, - m *DispatcherMetrics, + alerts provider.Alerts, + route *Route, + stage notify.Stage, + marker types.GroupMarker, + timeout func(time.Duration) time.Duration, + maintenanceInterval time.Duration, + limits Limits, + logger *slog.Logger, + metrics *DispatcherMetrics, ) *Dispatcher { - if lim == nil { - lim = nilLimits{} + if limits == nil { + limits = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - marker: mk, - timeout: to, - maintenanceInterval: mi, - logger: l.With("component", "dispatcher"), - metrics: m, - limits: lim, + alerts: alerts, + stage: stage, + route: route, + marker: marker, + timeout: timeout, + maintenanceInterval: maintenanceInterval, + logger: logger.With("component", "dispatcher"), + metrics: metrics, + limits: limits, + state: DispatcherStateUnknown, } return disp } // Run starts dispatching alerts incoming via the updates channel. -func (d *Dispatcher) Run() { +func (d *Dispatcher) Run(dispatchStartTime time.Time) { d.done = make(chan struct{}) d.mtx.Lock() + d.logger.Debug("preparing to start", "startTime", dispatchStartTime) + d.startTimer = time.NewTimer(time.Until(dispatchStartTime)) + d.state = DispatcherStateWaitingToStart + d.logger.Debug("setting state", "state", "waiting_to_start") d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) @@ -176,6 +191,18 @@ func (d *Dispatcher) run(it provider.AlertIterator) { } d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + case <-d.startTimer.C: + if d.state == DispatcherStateWaitingToStart { + d.state = DispatcherStateRunning + d.logger.Debug("started", "state", "running") + d.logger.Debug("Starting all existing aggregation groups") + for _, groups := range d.aggrGroupsPerRoute { + for _, ag := range groups { + d.runAG(ag) + } + } + } + case <-maintenance.C: d.doMaintenance() case <-d.ctx.Done(): @@ -311,6 +338,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool // processAlert determines in which aggregation group the alert falls // and inserts it. func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { + now := time.Now() groupLabels := getGroupLabels(alert, route) fp := groupLabels.Fingerprint() @@ -347,6 +375,30 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // alert is already there. ag.insert(alert) + if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) { + ag.logger.Debug( + "Alert is old enough for immediate flush, resetting timer to zero", + "alert", alert.Name(), + "fingerprint", alert.Fingerprint(), + "startsAt", alert.StartsAt, + ) + ag.resetTimer(0) + } + // Check dispatcher and alert state to determine if we should run the AG now. + switch d.state { + case DispatcherStateWaitingToStart: + d.logger.Debug("Dispatcher still waiting to start") + case DispatcherStateRunning: + d.runAG(ag) + default: + d.logger.Warn("unknown state detected", "state", "unknown") + } +} + +func (d *Dispatcher) runAG(ag *aggrGroup) { + if ag.running.Load() { + return + } go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { @@ -392,13 +444,18 @@ type aggrGroup struct { done chan struct{} next *time.Timer timeout func(time.Duration) time.Duration - - mtx sync.RWMutex - hasFlushed bool + running atomic.Bool } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup { +func newAggrGroup( + ctx context.Context, + labels model.LabelSet, + r *Route, + to func(time.Duration) time.Duration, + marker types.AlertMarker, + logger *slog.Logger, +) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } @@ -436,6 +493,7 @@ func (ag *aggrGroup) String() string { } func (ag *aggrGroup) run(nf notifyFunc) { + ag.running.Store(true) defer close(ag.done) defer ag.next.Stop() @@ -462,10 +520,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. - ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupInterval) - ag.hasFlushed = true - ag.mtx.Unlock() + ag.resetTimer(ag.opts.GroupInterval) ag.flush(func(alerts ...*types.Alert) bool { return nf(ctx, alerts...) @@ -486,19 +541,16 @@ func (ag *aggrGroup) stop() { <-ag.done } +// resetTimer resets the timer for the AG. +func (ag *aggrGroup) resetTimer(t time.Duration) { + ag.next.Reset(t) +} + // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { ag.logger.Error("error on set alert", "err", err) } - - // Immediately trigger a flush if the wait duration for this - // alert is already over. - ag.mtx.Lock() - defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { - ag.next.Reset(0) - } } func (ag *aggrGroup) empty() bool { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index fe4df6d2e..c14b9eba4 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 Prometheus Team +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -191,8 +191,6 @@ func TestAggrGroup(t *testing.T) { ag.stop() - // Add an alert that started more than group_interval in the past. We expect - // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger()) @@ -201,18 +199,12 @@ func TestAggrGroup(t *testing.T) { ag.insert(a1) ag.insert(a2) - // a2 lies way in the past so the initial group_wait should be skipped. - select { - case <-time.After(opts.GroupWait / 2): - t.Fatalf("expected immediate alert but received none") + batch := <-alertsCh + exp := removeEndsAt(types.AlertSlice{a1, a2}) + sort.Sort(batch) - case batch := <-alertsCh: - exp := removeEndsAt(types.AlertSlice{a1, a2}) - sort.Sort(batch) - - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) } for i := 0; i < 3; i++ { @@ -243,7 +235,7 @@ func TestAggrGroup(t *testing.T) { a1r := *a1 a1r.EndsAt = time.Now() ag.insert(&a1r) - exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) select { case <-time.After(2 * opts.GroupInterval): @@ -403,7 +395,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -556,7 +548,7 @@ route: lim := limits{groups: 6} m := NewDispatcherMetrics(true, reg) dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, lim, logger, m) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. @@ -675,7 +667,7 @@ func TestDispatcherRace(t *testing.T) { timeout := func(d time.Duration) time.Duration { return time.Duration(0) } dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) dispatcher.Stop() } @@ -704,7 +696,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) - go dispatcher.Run() + go dispatcher.Run(time.Now()) defer dispatcher.Stop() // Push all alerts. @@ -973,3 +965,96 @@ func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) { require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush") }) } + +func TestDispatchOnStartup(t *testing.T) { + logger := promslog.NewNopLogger() + reg := prometheus.NewRegistry() + marker := types.NewMarker(reg) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + // Set up a route with GroupBy to separate alerts into different aggregation groups. + route := &Route{ + RouteOpts: RouteOpts{ + Receiver: "default", + GroupBy: map[model.LabelName]struct{}{"instance": {}}, + GroupWait: 1 * time.Second, + GroupInterval: 3 * time.Minute, + RepeatInterval: 1 * time.Hour, + }, + } + + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + timeout := func(d time.Duration) time.Duration { return d } + + // Set start time to 3 seconds in the future + now := time.Now() + startDelay := 2 * time.Second + startTime := time.Now().Add(startDelay) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg)) + go dispatcher.Run(startTime) + defer dispatcher.Stop() + + // Create 2 similar alerts with start times in the past + alert1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert1", "instance": "1"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + alert2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "TestAlert2", "instance": "2"}, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now.Add(-1 * time.Hour), + EndsAt: now.Add(time.Hour), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + } + + // Send alert1 + require.NoError(t, alerts.Put(alert1)) + + var recordedAlerts []*types.Alert + // Expect a recorded alert after startTime + GroupWait which is in future + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 1 + }, startDelay+route.RouteOpts.GroupWait, 500*time.Millisecond) + + require.Equal(t, alert1.Fingerprint(), recordedAlerts[0].Fingerprint(), "expected alert1 to be dispatched after GroupWait") + + // Send alert2 + require.NoError(t, alerts.Put(alert2)) + + // Expect a recorded alert after GroupInterval + require.Eventually(t, func() bool { + recordedAlerts = recorder.Alerts() + return len(recordedAlerts) == 2 + }, route.RouteOpts.GroupInterval, 100*time.Millisecond) + + // Sort alerts by fingerprint for deterministic ordering + sort.Slice(recordedAlerts, func(i, j int) bool { + return recordedAlerts[i].Fingerprint() < recordedAlerts[j].Fingerprint() + }) + require.Equal(t, alert2.Fingerprint(), recordedAlerts[1].Fingerprint(), "expected alert2 to be dispatched after GroupInterval") + + // Verify both alerts are present + fingerprints := make(map[model.Fingerprint]bool) + for _, a := range recordedAlerts { + fingerprints[a.Fingerprint()] = true + } + require.True(t, fingerprints[alert1.Fingerprint()], "expected alert1 to be present") + require.True(t, fingerprints[alert2.Fingerprint()], "expected alert2 to be present") +} diff --git a/test/with_api_v2/acceptance.go b/test/with_api_v2/acceptance.go index 4a1f824b7..60878d124 100644 --- a/test/with_api_v2/acceptance.go +++ b/test/with_api_v2/acceptance.go @@ -164,7 +164,7 @@ func (t *AcceptanceTest) Collector(name string) *Collector { // Run starts all Alertmanagers and runs queries against them. It then checks // whether all expected notifications have arrived at the expected receiver. -func (t *AcceptanceTest) Run() { +func (t *AcceptanceTest) Run(additionalArgs ...string) { errc := make(chan error) for _, am := range t.amc.ams { @@ -173,7 +173,7 @@ func (t *AcceptanceTest) Run() { t.Cleanup(am.cleanup) } - err := t.amc.Start() + err := t.amc.Start(additionalArgs...) if err != nil { t.Log(err) t.Fail() @@ -263,14 +263,14 @@ type AlertmanagerCluster struct { } // Start the Alertmanager cluster and wait until it is ready to receive. -func (amc *AlertmanagerCluster) Start() error { - var peerFlags []string +func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error { + args := additionalArgs for _, am := range amc.ams { - peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr) + args = append(args, "--cluster.peer="+am.clusterAddr) } for _, am := range amc.ams { - err := am.Start(peerFlags) + err := am.Start(args) if err != nil { return fmt.Errorf("failed to start alertmanager cluster: %w", err) } diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index fa190f44c..167829932 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -419,6 +419,68 @@ receivers: } } +func TestColdStart(t *testing.T) { + t.Parallel() + + // This integration test ensures that the first alert isn't notified before + // the AlertManager process has started considering the resend delay. + conf := ` +route: + receiver: "default" + group_by: [] + group_wait: 1s + group_interval: 6s + repeat_interval: 10m + +receivers: +- name: "default" + webhook_configs: + - url: 'http://%s' +` + + at := NewAcceptanceTest(t, &AcceptanceOpts{ + Tolerance: 150 * time.Millisecond, + }) + + co := at.Collector("webhook") + wh := NewWebhook(t, co) + + amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1) + + amc.Push(At(1), Alert("alertname", "test1").Active(-100)) + amc.Push(At(2), Alert("alertname", "test2")) + + // Alerts are dispatched 5 seconds after the AlertManager process has started. + // start delay: 5s + // first alert received at: 1s + // first alert dispatched at: 5s - 1s = 4s + co.Want(Between(4, 5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + ) + + // Reload AlertManager process. + at.Do(At(5), amc.Reload) + + amc.Push(At(6), Alert("alertname", "test3").Active(-100)) + amc.Push(At(7), Alert("alertname", "test4")) + + // Group interval is applied on top of start delay. + // start delay: 5s + // group interval: 6s + // alerts dispatched at: 5s + 6s = 11s + co.Want(Between(11, 11.5), + Alert("alertname", "test1").Active(1), + Alert("alertname", "test2").Active(4), + Alert("alertname", "test3").Active(6), + Alert("alertname", "test4").Active(7), + ) + + at.Run("--dispatch.start-delay", "5s") + + t.Log(co.Check()) +} + func TestReload(t *testing.T) { t.Parallel()