mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 15:45:34 +01:00
fix(marker): stop state leakage from aggregation groups
This change makes aggregation groups to delete resolved alerts from marker, therefore avoiding the leakage of ghost states mentioned in #4402. Signed-off-by: Siavash Safi <siavash@cloudflare.com>
This commit is contained in:
@@ -340,7 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
|
||||
return
|
||||
}
|
||||
|
||||
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
|
||||
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
|
||||
routeGroups[fp] = ag
|
||||
d.aggrGroupsNum++
|
||||
d.metrics.aggrGroups.Inc()
|
||||
@@ -389,6 +389,7 @@ type aggrGroup struct {
|
||||
routeKey string
|
||||
|
||||
alerts *store.Alerts
|
||||
marker types.AlertMarker
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
done chan struct{}
|
||||
@@ -400,7 +401,7 @@ type aggrGroup struct {
|
||||
}
|
||||
|
||||
// newAggrGroup returns a new aggregation group.
|
||||
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger *slog.Logger) *aggrGroup {
|
||||
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, marker types.AlertMarker, logger *slog.Logger) *aggrGroup {
|
||||
if to == nil {
|
||||
to = func(d time.Duration) time.Duration { return d }
|
||||
}
|
||||
@@ -411,6 +412,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
|
||||
opts: &r.RouteOpts,
|
||||
timeout: to,
|
||||
alerts: store.NewAlerts(),
|
||||
marker: marker,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
||||
@@ -539,6 +541,14 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
||||
// we would delete an active alert thinking it was resolved.
|
||||
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
|
||||
ag.logger.Error("error on delete alerts", "err", err)
|
||||
} else {
|
||||
// Delete markers for resolved alerts that are not in the store.
|
||||
for _, alert := range resolvedSlice {
|
||||
_, err := ag.alerts.Get(alert.Fingerprint())
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
ag.marker.Delete(alert.Fingerprint())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func TestAggrGroup(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test regular situation where we wait for group_wait to send out alerts.
|
||||
ag := newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
|
||||
ag := newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
|
||||
go ag.run(ntfy)
|
||||
|
||||
ag.insert(a1)
|
||||
@@ -195,7 +195,7 @@ func TestAggrGroup(t *testing.T) {
|
||||
// immediate flushing.
|
||||
// Finally, set all alerts to be resolved. After successful notify the aggregation group
|
||||
// should empty itself.
|
||||
ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
|
||||
ag = newAggrGroup(context.Background(), lset, route, nil, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
|
||||
go ag.run(ntfy)
|
||||
|
||||
ag.insert(a1)
|
||||
@@ -757,7 +757,7 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
|
||||
|
||||
// Insert an aggregation group with no alerts.
|
||||
labels := model.LabelSet{"alertname": "1"}
|
||||
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, promslog.NewNopLogger())
|
||||
aggrGroup1 := newAggrGroup(ctx, labels, route, timeout, types.NewMarker(prometheus.NewRegistry()), promslog.NewNopLogger())
|
||||
aggrGroups[route][aggrGroup1.fingerprint()] = aggrGroup1
|
||||
dispatcher.aggrGroupsPerRoute = aggrGroups
|
||||
// Must run otherwise doMaintenance blocks on aggrGroup1.stop().
|
||||
@@ -775,3 +775,197 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
|
||||
require.False(t, isMuted)
|
||||
require.Empty(t, mutedBy)
|
||||
}
|
||||
|
||||
func TestDispatcher_DeleteResolvedAlertsFromMarker(t *testing.T) {
|
||||
t.Run("successful flush deletes markers for resolved alerts", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
labels := model.LabelSet{"alertname": "TestAlert"}
|
||||
route := &Route{
|
||||
RouteOpts: RouteOpts{
|
||||
Receiver: "test",
|
||||
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
|
||||
GroupWait: 0,
|
||||
GroupInterval: time.Minute,
|
||||
RepeatInterval: time.Hour,
|
||||
},
|
||||
}
|
||||
timeout := func(d time.Duration) time.Duration { return d }
|
||||
logger := promslog.NewNopLogger()
|
||||
|
||||
// Create an aggregation group
|
||||
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
|
||||
|
||||
// Create test alerts: one active and one resolved
|
||||
now := time.Now()
|
||||
activeAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "1",
|
||||
},
|
||||
StartsAt: now.Add(-time.Hour),
|
||||
EndsAt: now.Add(time.Hour), // Active alert
|
||||
},
|
||||
UpdatedAt: now,
|
||||
}
|
||||
resolvedAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "2",
|
||||
},
|
||||
StartsAt: now.Add(-time.Hour),
|
||||
EndsAt: now.Add(-time.Minute), // Resolved alert
|
||||
},
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
// Insert alerts into the aggregation group
|
||||
ag.insert(activeAlert)
|
||||
ag.insert(resolvedAlert)
|
||||
|
||||
// Set markers for both alerts
|
||||
marker.SetActiveOrSilenced(activeAlert.Fingerprint(), 0, nil, nil)
|
||||
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
|
||||
|
||||
// Verify markers exist before flush
|
||||
require.True(t, marker.Active(activeAlert.Fingerprint()))
|
||||
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
|
||||
|
||||
// Create a notify function that succeeds
|
||||
notifyFunc := func(alerts ...*types.Alert) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Flush the alerts
|
||||
ag.flush(notifyFunc)
|
||||
|
||||
// Verify that the resolved alert's marker was deleted
|
||||
require.True(t, marker.Active(activeAlert.Fingerprint()), "active alert marker should still exist")
|
||||
require.False(t, marker.Active(resolvedAlert.Fingerprint()), "resolved alert marker should be deleted")
|
||||
})
|
||||
|
||||
t.Run("failed flush does not delete markers", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
labels := model.LabelSet{"alertname": "TestAlert"}
|
||||
route := &Route{
|
||||
RouteOpts: RouteOpts{
|
||||
Receiver: "test",
|
||||
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
|
||||
GroupWait: 0,
|
||||
GroupInterval: time.Minute,
|
||||
RepeatInterval: time.Hour,
|
||||
},
|
||||
}
|
||||
timeout := func(d time.Duration) time.Duration { return d }
|
||||
logger := promslog.NewNopLogger()
|
||||
|
||||
// Create an aggregation group
|
||||
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
|
||||
|
||||
// Create a resolved alert
|
||||
now := time.Now()
|
||||
resolvedAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "1",
|
||||
},
|
||||
StartsAt: now.Add(-time.Hour),
|
||||
EndsAt: now.Add(-time.Minute), // Resolved alert
|
||||
},
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
// Insert alert into the aggregation group
|
||||
ag.insert(resolvedAlert)
|
||||
|
||||
// Set marker for the alert
|
||||
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
|
||||
|
||||
// Verify marker exists before flush
|
||||
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
|
||||
|
||||
// Create a notify function that fails
|
||||
notifyFunc := func(alerts ...*types.Alert) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Flush the alerts (notify will fail)
|
||||
ag.flush(notifyFunc)
|
||||
|
||||
// Verify that the marker was NOT deleted due to failed notification
|
||||
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when notify fails")
|
||||
})
|
||||
|
||||
t.Run("markers not deleted when alert is modified during flush", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
marker := types.NewMarker(prometheus.NewRegistry())
|
||||
labels := model.LabelSet{"alertname": "TestAlert"}
|
||||
route := &Route{
|
||||
RouteOpts: RouteOpts{
|
||||
Receiver: "test",
|
||||
GroupBy: map[model.LabelName]struct{}{"alertname": {}},
|
||||
GroupWait: 0,
|
||||
GroupInterval: time.Minute,
|
||||
RepeatInterval: time.Hour,
|
||||
},
|
||||
}
|
||||
timeout := func(d time.Duration) time.Duration { return d }
|
||||
logger := promslog.NewNopLogger()
|
||||
|
||||
// Create an aggregation group
|
||||
ag := newAggrGroup(ctx, labels, route, timeout, marker, logger)
|
||||
|
||||
// Create a resolved alert
|
||||
now := time.Now()
|
||||
resolvedAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "1",
|
||||
},
|
||||
StartsAt: now.Add(-time.Hour),
|
||||
EndsAt: now.Add(-time.Minute), // Resolved alert
|
||||
},
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
// Insert alert into the aggregation group
|
||||
ag.insert(resolvedAlert)
|
||||
|
||||
// Set marker for the alert
|
||||
marker.SetActiveOrSilenced(resolvedAlert.Fingerprint(), 0, nil, nil)
|
||||
|
||||
// Verify marker exists before flush
|
||||
require.True(t, marker.Active(resolvedAlert.Fingerprint()))
|
||||
|
||||
// Create a notify function that modifies the alert before returning
|
||||
notifyFunc := func(alerts ...*types.Alert) bool {
|
||||
// Simulate the alert being modified (e.g., firing again) during flush
|
||||
modifiedAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "1",
|
||||
},
|
||||
StartsAt: now.Add(-time.Hour),
|
||||
EndsAt: now.Add(time.Hour), // Active again
|
||||
},
|
||||
UpdatedAt: now.Add(time.Second), // More recent update
|
||||
}
|
||||
// Update the alert in the store
|
||||
ag.alerts.Set(modifiedAlert)
|
||||
return true
|
||||
}
|
||||
|
||||
// Flush the alerts
|
||||
ag.flush(notifyFunc)
|
||||
|
||||
// Verify that the marker was NOT deleted because the alert was modified
|
||||
// during the flush (DeleteIfNotModified should have failed)
|
||||
require.True(t, marker.Active(resolvedAlert.Fingerprint()), "marker should not be deleted when alert is modified during flush")
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user