1
0
mirror of https://github.com/prometheus/alertmanager.git synced 2026-02-05 15:45:34 +01:00
Files
alertmanager/dispatch/dispatch_test.go
Siavash Safi 18939cee8f feat: add distributed tracing support (#4745)
Add tracing support using otel to the the following components:
- api: extract trace and span IDs from request context
- provider: mem put
- dispatch: split logic and use better naming
- inhibit: source and target traces, mutes, etc. drop metrics
- silence: query, expire, mutes
- notify: add distributed tracing support to stages and all http requests

Note: inhibitor metrics are dropped since we have tracing now and they
are not needed. We have not released any version with these metrics so
we can drop them safely, this is not a breaking change.

This change borrows part of the implementation from #3673
Fixes #3670

Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Co-authored-by: Dave Henderson <dhenderson@gmail.com>
2025-12-05 22:58:44 +01:00

1062 lines
31 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dispatch
import (
"context"
"fmt"
"log/slog"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/types"
)
const testMaintenanceInterval = 30 * time.Second
func TestAggrGroup(t *testing.T) {
lset := model.LabelSet{
"a": "v1",
"b": "v2",
}
opts := &RouteOpts{
Receiver: "n1",
GroupBy: map[model.LabelName]struct{}{
"a": {},
"b": {},
},
GroupWait: 1 * time.Second,
GroupInterval: 300 * time.Millisecond,
RepeatInterval: 1 * time.Hour,
}
route := &Route{
RouteOpts: *opts,
}
var (
a1 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
StartsAt: time.Now().Add(time.Minute),
EndsAt: time.Now().Add(time.Hour),
},
UpdatedAt: time.Now(),
}
a2 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v4",
},
StartsAt: time.Now().Add(-time.Hour),
EndsAt: time.Now().Add(2 * time.Hour),
},
UpdatedAt: time.Now(),
}
a3 = &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v5",
},
StartsAt: time.Now().Add(time.Minute),
EndsAt: time.Now().Add(5 * time.Minute),
},
UpdatedAt: time.Now(),
}
)
var (
last = time.Now()
current = time.Now()
lastCurMtx = &sync.Mutex{}
alertsCh = make(chan types.AlertSlice)
)
ntfy := func(ctx context.Context, alerts ...*types.Alert) bool {
// Validate that the context is properly populated.
if _, ok := notify.Now(ctx); !ok {
t.Errorf("now missing")
}
if _, ok := notify.GroupKey(ctx); !ok {
t.Errorf("group key missing")
}
if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) {
t.Errorf("wrong group labels: %q", lbls)
}
if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver {
t.Errorf("wrong receiver: %q", rcv)
}
if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval {
t.Errorf("wrong repeat interval: %q", ri)
}
lastCurMtx.Lock()
last = current
// Subtract a millisecond to allow for races.
current = time.Now().Add(-time.Millisecond)
lastCurMtx.Unlock()
alertsCh <- types.AlertSlice(alerts)
return true
}
removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
for i, a := range as {
ac := *a
ac.EndsAt = time.Time{}
as[i] = &ac
}
return as
}
// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)
ctx := context.Background()
ag.insert(ctx, a1)
select {
case <-time.After(2 * opts.GroupWait):
t.Fatalf("expected initial batch after group_wait")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupWait {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
for range 3 {
// New alert should come in after group interval.
ag.insert(ctx, a3)
select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a3})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
}
ag.stop()
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
go ag.run(ntfy)
ag.insert(ctx, a1)
ag.insert(ctx, a2)
batch := <-alertsCh
exp := removeEndsAt(types.AlertSlice{a1, a2})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
for range 3 {
// New alert should come in after group interval.
ag.insert(ctx, a3)
select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
}
// Resolve an alert, and it should be removed after the next batch was sent.
a1r := *a1
a1r.EndsAt = time.Now()
ag.insert(ctx, &a1r)
exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)
select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
}
// Resolve all remaining alerts, they should be removed after the next batch was sent.
// Do not add a1r as it should have been deleted following the previous batch.
a2r, a3r := *a2, *a3
resolved := types.AlertSlice{&a2r, &a3r}
for _, a := range resolved {
a.EndsAt = time.Now()
ag.insert(ctx, a)
}
select {
case <-time.After(2 * opts.GroupInterval):
t.Fatalf("expected new batch after group interval but received none")
case batch := <-alertsCh:
lastCurMtx.Lock()
s := time.Since(last)
lastCurMtx.Unlock()
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
sort.Sort(batch)
if !reflect.DeepEqual(batch, resolved) {
t.Fatalf("expected alerts %v but got %v", resolved, batch)
}
if !ag.empty() {
t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag)
}
}
ag.stop()
}
func TestGroupLabels(t *testing.T) {
a := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
},
}
route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{
"a": {},
"b": {},
},
GroupByAll: false,
},
}
expLs := model.LabelSet{
"a": "v1",
"b": "v2",
}
ls := getGroupLabels(a, route)
if !reflect.DeepEqual(ls, expLs) {
t.Fatalf("expected labels are %v, but got %v", expLs, ls)
}
}
func TestGroupByAllLabels(t *testing.T) {
a := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
},
},
}
route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
GroupByAll: true,
},
}
expLs := model.LabelSet{
"a": "v1",
"b": "v2",
"c": "v3",
}
ls := getGroupLabels(a, route)
if !reflect.DeepEqual(ls, expLs) {
t.Fatalf("expected labels are %v, but got %v", expLs, ls)
}
}
func TestGroups(t *testing.T) {
confData := `receivers:
- name: 'kafka'
- name: 'prod'
- name: 'testing'
route:
group_by: ['alertname']
group_wait: 10ms
group_interval: 10ms
receiver: 'prod'
routes:
- match:
env: 'testing'
receiver: 'testing'
group_by: ['alertname', 'service']
- match:
env: 'prod'
receiver: 'prod'
group_by: ['alertname', 'service', 'cluster']
continue: true
- match:
kafka: 'yes'
receiver: 'kafka'
group_by: ['alertname', 'service', 'cluster']`
conf, err := config.Load(confData)
if err != nil {
t.Fatal(err)
}
logger := promslog.NewNopLogger()
route := NewRoute(conf.Route, nil)
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
go dispatcher.Run(time.Now())
defer dispatcher.Stop()
// Create alerts. the dispatcher will automatically create the groups.
inputAlerts := []*types.Alert{
// Matches the parent route.
newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
// Matches the first sub-route.
newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
// Matches the second and third sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
}
alerts.Put(context.Background(), inputAlerts...)
// Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
require.Len(t, recorder.Alerts(), 7)
alertGroups, receivers, _ := dispatcher.Groups(context.Background(),
func(*Route) bool {
return true
}, func(*types.Alert, time.Time) bool {
return true
},
)
require.Equal(t, AlertGroups{
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[0]},
Labels: model.LabelSet{
"alertname": "OtherAlert",
},
Receiver: "prod",
GroupKey: "{}:{alertname=\"OtherAlert\"}",
RouteID: "{}",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[1]},
Labels: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
},
Receiver: "testing",
GroupKey: "{}/{env=\"testing\"}:{alertname=\"TestingAlert\", service=\"api\"}",
RouteID: "{}/{env=\"testing\"}/0",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
},
Receiver: "prod",
GroupKey: "{}/{env=\"prod\"}:{alertname=\"HighErrorRate\", cluster=\"aa\", service=\"api\"}",
RouteID: "{}/{env=\"prod\"}/1",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[4]},
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "bb",
},
Receiver: "prod",
GroupKey: "{}/{env=\"prod\"}:{alertname=\"HighErrorRate\", cluster=\"bb\", service=\"api\"}",
RouteID: "{}/{env=\"prod\"}/1",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
},
Receiver: "kafka",
GroupKey: "{}/{kafka=\"yes\"}:{alertname=\"HighLatency\", cluster=\"bb\", service=\"db\"}",
RouteID: "{}/{kafka=\"yes\"}/2",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Labels: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
},
Receiver: "prod",
GroupKey: "{}/{env=\"prod\"}:{alertname=\"HighLatency\", cluster=\"bb\", service=\"db\"}",
RouteID: "{}/{env=\"prod\"}/1",
},
}, alertGroups)
require.Equal(t, map[model.Fingerprint][]string{
inputAlerts[0].Fingerprint(): {"prod"},
inputAlerts[1].Fingerprint(): {"testing"},
inputAlerts[2].Fingerprint(): {"prod"},
inputAlerts[3].Fingerprint(): {"prod"},
inputAlerts[4].Fingerprint(): {"prod"},
inputAlerts[5].Fingerprint(): {"kafka", "prod"},
}, receivers)
}
func TestGroupsWithLimits(t *testing.T) {
confData := `receivers:
- name: 'kafka'
- name: 'prod'
- name: 'testing'
route:
group_by: ['alertname']
group_wait: 10ms
group_interval: 10ms
receiver: 'prod'
routes:
- match:
env: 'testing'
receiver: 'testing'
group_by: ['alertname', 'service']
- match:
env: 'prod'
receiver: 'prod'
group_by: ['alertname', 'service', 'cluster']
continue: true
- match:
kafka: 'yes'
receiver: 'kafka'
group_by: ['alertname', 'service', 'cluster']`
conf, err := config.Load(confData)
if err != nil {
t.Fatal(err)
}
logger := promslog.NewNopLogger()
route := NewRoute(conf.Route, nil)
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
lim := limits{groups: 6}
m := NewDispatcherMetrics(true, reg)
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, lim, logger, m)
go dispatcher.Run(time.Now())
defer dispatcher.Stop()
// Create alerts. the dispatcher will automatically create the groups.
inputAlerts := []*types.Alert{
// Matches the parent route.
newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
// Matches the first sub-route.
newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
// Matches the second sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
// Matches the second and third sub-route.
newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
}
err = alerts.Put(context.Background(), inputAlerts...)
if err != nil {
t.Fatal(err)
}
// Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
require.Len(t, recorder.Alerts(), 7)
routeFilter := func(*Route) bool { return true }
alertFilter := func(*types.Alert, time.Time) bool { return true }
alertGroups, _, _ := dispatcher.Groups(context.Background(), routeFilter, alertFilter)
require.Len(t, alertGroups, 6)
require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached))
// Try to store new alert. This time, we will hit limit for number of groups.
err = alerts.Put(context.Background(), newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"}))
if err != nil {
t.Fatal(err)
}
// Let alert get processed.
for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached))
// Verify there are still only 6 groups.
alertGroups, _, _ = dispatcher.Groups(context.Background(), routeFilter, alertFilter)
require.Len(t, alertGroups, 6)
}
type recordStage struct {
mtx sync.RWMutex
alerts map[string]map[model.Fingerprint]*types.Alert
}
func (r *recordStage) Alerts() []*types.Alert {
r.mtx.RLock()
defer r.mtx.RUnlock()
alerts := make([]*types.Alert, 0)
for k := range r.alerts {
for _, a := range r.alerts[k] {
alerts = append(alerts, a)
}
}
return alerts
}
func (r *recordStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
gk, ok := notify.GroupKey(ctx)
if !ok {
panic("GroupKey not present!")
}
if _, ok := r.alerts[gk]; !ok {
r.alerts[gk] = make(map[model.Fingerprint]*types.Alert)
}
for _, a := range alerts {
r.alerts[gk][a.Fingerprint()] = a
}
return ctx, nil, nil
}
var (
// Set the start time in the past to trigger a flush immediately.
t0 = time.Now().Add(-time.Minute)
// Set the end time in the future to avoid deleting the alert.
t1 = t0.Add(2 * time.Minute)
)
func newAlert(labels model.LabelSet) *types.Alert {
return &types.Alert{
Alert: model.Alert{
Labels: labels,
Annotations: model.LabelSet{"foo": "bar"},
StartsAt: t0,
EndsAt: t1,
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: t0,
Timeout: false,
}
}
func TestDispatcherRace(t *testing.T) {
logger := promslog.NewNopLogger()
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
go dispatcher.Run(time.Now())
dispatcher.Stop()
}
func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) {
const numAlerts = 5000
logger := promslog.NewNopLogger()
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
route := &Route{
RouteOpts: RouteOpts{
Receiver: "default",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: 1 * time.Hour, // Should never hit in this test.
RepeatInterval: 1 * time.Hour, // Should never hit in this test.
},
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
go dispatcher.Run(time.Now())
defer dispatcher.Stop()
// Push all alerts.
for i := range numAlerts {
alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))})
require.NoError(t, alerts.Put(context.Background(), alert))
}
// Wait until the alerts have been notified or the waiting timeout expires.
for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); {
if len(recorder.Alerts()) >= numAlerts {
break
}
// Throttle.
time.Sleep(10 * time.Millisecond)
}
// We expect all alerts to be notified immediately, since they all belong to different groups.
require.Len(t, recorder.Alerts(), numAlerts)
}
type limits struct {
groups int
}
func (l limits) MaxNumberOfAggregationGroups() int {
return l.groups
}
func TestDispatcher_DoMaintenance(t *testing.T) {
r := prometheus.NewRegistry()
marker := types.NewMarker(r)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Minute, nil, promslog.NewNopLogger(), r)
if err != nil {
t.Fatal(err)
}
route := &Route{
RouteOpts: RouteOpts{
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: 5 * time.Minute, // Should never hit in this test.
},
}
timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
ctx := context.Background()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, promslog.NewNopLogger(), NewDispatcherMetrics(false, r))
aggrGroups := make(map[*Route]map[model.Fingerprint]*aggrGroup)
aggrGroups[route] = make(map[model.Fingerprint]*aggrGroup)
// Insert an aggregation group with no alerts.
labels := model.LabelSet{"alertname": "1"}
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
dispatcher.aggrGroupsPerRoute = aggrGroups
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
go aggrGroup1.run(func(context.Context, ...*types.Alert) bool { return true })
// Insert a marker for the aggregation group's group key.
marker.SetMuted(route.ID(), aggrGroup1.GroupKey(), []string{"weekends"})
mutedBy, isMuted := marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.True(t, isMuted)
require.Equal(t, []string{"weekends"}, mutedBy)
// Run the maintenance and the marker should be removed.
dispatcher.doMaintenance()
mutedBy, isMuted = marker.Muted(route.ID(), aggrGroup1.GroupKey())
require.False(t, isMuted)
require.Empty(t, mutedBy)
}
func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()
// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
// Create test alerts: one active and one resolved
now := time.Now()
activeAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour), // Active alert
},
UpdatedAt: now,
}
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "2",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}
// Insert alerts into the aggregation group
ag.insert(ctx, activeAlert)
ag.insert(ctx, resolvedAlert)
// Set markers for both alerts
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil)
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
// Verify markers exist before flush
require.True(t, marker.Active(activeAlert.Fingerprint()))
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
// Create a notify function that succeeds
notifyFunc := func(alerts ...*types.Alert) bool {
return true
}
// Flush the alerts
ag.flush(notifyFunc)
// Verify that the resolved alert's marker was deleted
require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist")
require.False(t, marker.Active(resolvedAlert.Fingerprint()), "resolved alert marker should be deleted")
})
t.Run("failed flush does not delete markers", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()
// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
// Create a resolved alert
now := time.Now()
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}
// Insert alert into the aggregation group
ag.insert(ctx, resolvedAlert)
// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
// Create a notify function that fails
notifyFunc := func(alerts ...*types.Alert) bool {
return false
}
// Flush the alerts (notify will fail)
ag.flush(notifyFunc)
// Verify that the marker was NOT deleted due to failed notification
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when notify fails")
})
t.Run("markers not deleted when alert is modified during flush", func(t *testing.T) {
ctx := context.Background()
marker := types.NewMarker(prometheus.NewRegistry())
labels := model.LabelSet{"alertname": "TestAlert"}
route := &Route{
RouteOpts: RouteOpts{
Receiver: "test",
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
GroupWait: 0,
GroupInterval: time.Minute,
RepeatInterval: time.Hour,
},
}
timeout := func(d time.Duration) time.Duration { return d }
logger := promslog.NewNopLogger()
// Create an aggregation group
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
// Create a resolved alert
now := time.Now()
resolvedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(-time.Minute), // Resolved alert
},
UpdatedAt: now,
}
// Insert alert into the aggregation group
ag.insert(ctx, resolvedAlert)
// Set marker for the alert
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
// Verify marker exists before flush
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
// Create a notify function that modifies the alert before returning
notifyFunc := func(alerts ...*types.Alert) bool {
// Simulate the alert being modified (e.g., firing again) during flush
modifiedAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"alertname": "TestAlert",
"instance": "1",
},
StartsAt: now.Add(-time.Hour),
EndsAt: now.Add(time.Hour), // Active again
},
UpdatedAt: now.Add(time.Second), // More recent update
}
// Update the alert in the store
ag.alerts.Set(modifiedAlert)
return true
}
// Flush the alerts
ag.flush(notifyFunc)
// Verify that the marker was NOT deleted because the alert was modified
// during the flush (DeleteIfNotModified should have failed)
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush")
})
}
func TestDispatchOnStartup(t *testing.T) {
logger := promslog.NewNopLogger()
reg := prometheus.NewRegistry()
marker := types.NewMarker(reg)
alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, reg)
if err != nil {
t.Fatal(err)
}
defer alerts.Close()
// Set up a route with GroupBy to separate alerts into different aggregation groups.
route := &Route{
RouteOpts: RouteOpts{
Receiver: "default",
GroupBy: map[model.LabelName]struct{}{"instance": {}},
GroupWait: 1 * time.Second,
GroupInterval: 3 * time.Minute,
RepeatInterval: 1 * time.Hour,
},
}
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
timeout := func(d time.Duration) time.Duration { return d }
// Set start time to 3 seconds in the future
now := time.Now()
startDelay := 2 * time.Second
startTime := time.Now().Add(startDelay)
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, testMaintenanceInterval, nil, logger, NewDispatcherMetrics(false, reg))
go dispatcher.Run(startTime)
defer dispatcher.Stop()
// Create 2 similar alerts with start times in the past
alert1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "TestAlert1", "instance": "1"},
Annotations: model.LabelSet{"foo": "bar"},
StartsAt: now.Add(-1 * time.Hour),
EndsAt: now.Add(time.Hour),
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: now,
Timeout: false,
}
alert2 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "TestAlert2", "instance": "2"},
Annotations: model.LabelSet{"foo": "bar"},
StartsAt: now.Add(-1 * time.Hour),
EndsAt: now.Add(time.Hour),
GeneratorURL: "http://example.com/prometheus",
},
UpdatedAt: now,
Timeout: false,
}
// Send alert1
require.NoError(t, alerts.Put(context.Background(), alert1))
var recordedAlerts []*types.Alert
// Expect a recorded alert after startTime + GroupWait which is in future
require.Eventually(t, func() bool {
recordedAlerts = recorder.Alerts()
return len(recordedAlerts) == 1
}, startDelay+route.RouteOpts.GroupWait, 500*time.Millisecond)
require.Equal(t, alert1.Fingerprint(), recordedAlerts[0].Fingerprint(), "expected alert1 to be dispatched after GroupWait")
// Send alert2
require.NoError(t, alerts.Put(context.Background(), alert2))
// Expect a recorded alert after GroupInterval
require.Eventually(t, func() bool {
recordedAlerts = recorder.Alerts()
return len(recordedAlerts) == 2
}, route.RouteOpts.GroupInterval, 100*time.Millisecond)
// Sort alerts by fingerprint for deterministic ordering
sort.Slice(recordedAlerts, func(i, j int) bool {
return recordedAlerts[i].Fingerprint() < recordedAlerts[j].Fingerprint()
})
require.Equal(t, alert2.Fingerprint(), recordedAlerts[1].Fingerprint(), "expected alert2 to be dispatched after GroupInterval")
// Verify both alerts are present
fingerprints := make(map[model.Fingerprint]bool)
for _, a := range recordedAlerts {
fingerprints[a.Fingerprint()] = true
}
require.True(t, fingerprints[alert1.Fingerprint()], "expected alert1 to be present")
require.True(t, fingerprints[alert2.Fingerprint()], "expected alert2 to be present")
}