2025-11-15 15:35:59 +01:00
|
|
|
// Copyright The Prometheus Authors
|
2018-05-14 14:36:24 +02:00
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
|
//
|
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
//
|
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
2016-08-09 11:04:01 +02:00
|
|
|
package dispatch
|
2015-07-02 18:38:05 +02:00
|
|
|
|
|
|
|
|
import (
|
2018-11-09 10:00:23 +01:00
|
|
|
"context"
|
2023-11-24 22:17:35 +01:00
|
|
|
"errors"
|
2015-09-26 11:12:47 +02:00
|
|
|
"fmt"
|
2024-11-06 04:09:57 -05:00
|
|
|
"log/slog"
|
2025-11-19 10:23:37 -07:00
|
|
|
"maps"
|
2015-11-10 14:52:04 +01:00
|
|
|
"sort"
|
2015-07-02 18:38:05 +02:00
|
|
|
"sync"
|
2025-11-15 15:35:59 +01:00
|
|
|
"sync/atomic"
|
2015-07-02 18:38:05 +02:00
|
|
|
"time"
|
|
|
|
|
|
2019-11-26 09:04:56 +01:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2025-10-29 16:03:55 +01:00
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
2015-07-02 18:38:05 +02:00
|
|
|
"github.com/prometheus/common/model"
|
2025-12-05 22:58:44 +01:00
|
|
|
"go.opentelemetry.io/otel"
|
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
|
"go.opentelemetry.io/otel/codes"
|
|
|
|
|
"go.opentelemetry.io/otel/propagation"
|
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2015-09-25 13:12:51 +02:00
|
|
|
|
2015-09-29 15:12:31 +02:00
|
|
|
"github.com/prometheus/alertmanager/notify"
|
2015-09-25 13:12:51 +02:00
|
|
|
"github.com/prometheus/alertmanager/provider"
|
2018-09-03 14:52:53 +02:00
|
|
|
"github.com/prometheus/alertmanager/store"
|
2015-09-25 14:38:57 +02:00
|
|
|
"github.com/prometheus/alertmanager/types"
|
2015-07-02 18:38:05 +02:00
|
|
|
)
|
|
|
|
|
|
2025-11-15 15:35:59 +01:00
|
|
|
const (
|
|
|
|
|
DispatcherStateUnknown = iota
|
|
|
|
|
DispatcherStateWaitingToStart
|
|
|
|
|
DispatcherStateRunning
|
|
|
|
|
)
|
|
|
|
|
|
2025-12-05 22:58:44 +01:00
|
|
|
var tracer = otel.Tracer("github.com/prometheus/alertmanager/dispatch")
|
|
|
|
|
|
2019-11-26 09:04:56 +01:00
|
|
|
// DispatcherMetrics represents metrics associated to a dispatcher.
|
|
|
|
|
type DispatcherMetrics struct {
|
2021-05-05 17:26:37 +02:00
|
|
|
aggrGroups prometheus.Gauge
|
|
|
|
|
processingDuration prometheus.Summary
|
|
|
|
|
aggrGroupLimitReached prometheus.Counter
|
2019-11-26 09:04:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewDispatcherMetrics returns a new registered DispatchMetrics.
|
2021-05-20 08:49:16 +02:00
|
|
|
func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
|
2025-10-29 16:03:55 +01:00
|
|
|
if r == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2019-11-26 09:04:56 +01:00
|
|
|
m := DispatcherMetrics{
|
2025-10-29 16:03:55 +01:00
|
|
|
aggrGroups: promauto.With(r).NewGauge(
|
2019-11-26 09:04:56 +01:00
|
|
|
prometheus.GaugeOpts{
|
|
|
|
|
Name: "alertmanager_dispatcher_aggregation_groups",
|
|
|
|
|
Help: "Number of active aggregation groups",
|
|
|
|
|
},
|
|
|
|
|
),
|
2025-10-29 16:03:55 +01:00
|
|
|
processingDuration: promauto.With(r).NewSummary(
|
2019-11-26 09:04:56 +01:00
|
|
|
prometheus.SummaryOpts{
|
|
|
|
|
Name: "alertmanager_dispatcher_alert_processing_duration_seconds",
|
|
|
|
|
Help: "Summary of latencies for the processing of alerts.",
|
|
|
|
|
},
|
|
|
|
|
),
|
2025-10-29 16:03:55 +01:00
|
|
|
aggrGroupLimitReached: promauto.With(r).NewCounter(
|
2021-05-05 17:26:37 +02:00
|
|
|
prometheus.CounterOpts{
|
|
|
|
|
Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total",
|
|
|
|
|
Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
|
|
|
|
|
},
|
|
|
|
|
),
|
2019-11-26 09:04:56 +01:00
|
|
|
}
|
2020-03-06 15:09:30 +01:00
|
|
|
|
2019-11-26 09:04:56 +01:00
|
|
|
return &m
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// Dispatcher sorts incoming alerts into aggregation groups and
|
|
|
|
|
// assigns the correct notifiers to each.
|
2015-07-02 18:38:05 +02:00
|
|
|
type Dispatcher struct {
|
2025-12-05 22:58:44 +01:00
|
|
|
route *Route
|
|
|
|
|
alerts provider.Alerts
|
|
|
|
|
stage notify.Stage
|
|
|
|
|
marker types.GroupMarker
|
|
|
|
|
metrics *DispatcherMetrics
|
|
|
|
|
limits Limits
|
|
|
|
|
propagator propagation.TextMapPropagator
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2016-09-05 13:19:14 +02:00
|
|
|
timeout func(time.Duration) time.Duration
|
2015-11-09 14:34:57 +01:00
|
|
|
|
2021-05-05 17:26:37 +02:00
|
|
|
mtx sync.RWMutex
|
2025-11-18 01:42:18 -07:00
|
|
|
loadingFinished sync.WaitGroup
|
2021-05-05 17:26:37 +02:00
|
|
|
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
|
|
|
|
|
aggrGroupsNum int
|
2015-07-04 14:41:10 +02:00
|
|
|
|
2025-09-04 13:33:13 +02:00
|
|
|
maintenanceInterval time.Duration
|
|
|
|
|
done chan struct{}
|
|
|
|
|
ctx context.Context
|
|
|
|
|
cancel func()
|
2015-09-29 11:58:30 +02:00
|
|
|
|
2024-11-06 04:09:57 -05:00
|
|
|
logger *slog.Logger
|
2025-11-15 15:35:59 +01:00
|
|
|
|
|
|
|
|
startTimer *time.Timer
|
|
|
|
|
state int
|
2015-07-02 18:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2021-05-05 17:26:37 +02:00
|
|
|
// Limits describes limits used by Dispatcher.
|
|
|
|
|
type Limits interface {
|
|
|
|
|
// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
|
|
|
|
|
// 0 or negative value = unlimited.
|
|
|
|
|
// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
|
|
|
|
|
MaxNumberOfAggregationGroups() int
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// NewDispatcher returns a new Dispatcher.
|
2016-09-05 13:19:14 +02:00
|
|
|
func NewDispatcher(
|
2025-11-15 15:35:59 +01:00
|
|
|
alerts provider.Alerts,
|
|
|
|
|
route *Route,
|
|
|
|
|
stage notify.Stage,
|
|
|
|
|
marker types.GroupMarker,
|
|
|
|
|
timeout func(time.Duration) time.Duration,
|
|
|
|
|
maintenanceInterval time.Duration,
|
|
|
|
|
limits Limits,
|
|
|
|
|
logger *slog.Logger,
|
|
|
|
|
metrics *DispatcherMetrics,
|
2016-09-05 13:19:14 +02:00
|
|
|
) *Dispatcher {
|
2025-11-15 15:35:59 +01:00
|
|
|
if limits == nil {
|
|
|
|
|
limits = nilLimits{}
|
2021-05-05 17:26:37 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-11 16:54:39 +02:00
|
|
|
disp := &Dispatcher{
|
2025-11-15 15:35:59 +01:00
|
|
|
alerts: alerts,
|
|
|
|
|
stage: stage,
|
|
|
|
|
route: route,
|
|
|
|
|
marker: marker,
|
|
|
|
|
timeout: timeout,
|
|
|
|
|
maintenanceInterval: maintenanceInterval,
|
|
|
|
|
logger: logger.With("component", "dispatcher"),
|
|
|
|
|
metrics: metrics,
|
|
|
|
|
limits: limits,
|
2025-12-05 22:58:44 +01:00
|
|
|
propagator: otel.GetTextMapPropagator(),
|
2025-11-15 15:35:59 +01:00
|
|
|
state: DispatcherStateUnknown,
|
2015-09-27 13:09:02 +02:00
|
|
|
}
|
2025-11-18 01:42:18 -07:00
|
|
|
disp.loadingFinished.Add(1)
|
2015-10-11 16:54:39 +02:00
|
|
|
return disp
|
2015-07-09 07:01:38 -06:00
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// Run starts dispatching alerts incoming via the updates channel.
|
2025-11-15 15:35:59 +01:00
|
|
|
func (d *Dispatcher) Run(dispatchStartTime time.Time) {
|
2015-09-25 00:15:27 +02:00
|
|
|
d.done = make(chan struct{})
|
2015-11-19 13:47:31 +01:00
|
|
|
|
|
|
|
|
d.mtx.Lock()
|
2025-11-15 15:35:59 +01:00
|
|
|
d.logger.Debug("preparing to start", "startTime", dispatchStartTime)
|
|
|
|
|
d.startTimer = time.NewTimer(time.Until(dispatchStartTime))
|
|
|
|
|
d.state = DispatcherStateWaitingToStart
|
|
|
|
|
d.logger.Debug("setting state", "state", "waiting_to_start")
|
2021-05-05 17:26:37 +02:00
|
|
|
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
|
|
|
|
|
d.aggrGroupsNum = 0
|
2019-11-26 09:04:56 +01:00
|
|
|
d.metrics.aggrGroups.Set(0)
|
2015-09-25 13:44:00 +02:00
|
|
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
2020-03-19 10:32:37 -04:00
|
|
|
d.mtx.Unlock()
|
2015-09-25 13:44:00 +02:00
|
|
|
|
2025-11-18 01:42:18 -07:00
|
|
|
initalAlerts, it := d.alerts.SlurpAndSubscribe("dispatcher")
|
|
|
|
|
for _, alert := range initalAlerts {
|
2025-12-05 22:58:44 +01:00
|
|
|
d.routeAlert(d.ctx, alert)
|
2025-11-18 01:42:18 -07:00
|
|
|
}
|
|
|
|
|
d.loadingFinished.Done()
|
|
|
|
|
|
|
|
|
|
d.run(it)
|
2015-10-11 16:54:39 +02:00
|
|
|
close(d.done)
|
2015-09-25 13:44:00 +02:00
|
|
|
}
|
|
|
|
|
|
2015-09-29 10:00:02 +02:00
|
|
|
func (d *Dispatcher) run(it provider.AlertIterator) {
|
2025-09-04 13:33:13 +02:00
|
|
|
maintenance := time.NewTicker(d.maintenanceInterval)
|
2024-05-13 11:16:26 +01:00
|
|
|
defer maintenance.Stop()
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2015-09-29 10:00:02 +02:00
|
|
|
defer it.Close()
|
|
|
|
|
|
2015-07-04 14:59:52 +02:00
|
|
|
for {
|
|
|
|
|
select {
|
2015-11-20 15:10:38 +01:00
|
|
|
case alert, ok := <-it.Next():
|
|
|
|
|
if !ok {
|
|
|
|
|
// Iterator exhausted for some reason.
|
|
|
|
|
if err := it.Err(); err != nil {
|
2024-11-06 04:09:57 -05:00
|
|
|
d.logger.Error("Error on alert update", "err", err)
|
2015-11-20 15:10:38 +01:00
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-11 16:54:39 +02:00
|
|
|
// Log errors but keep trying.
|
2015-09-29 10:00:02 +02:00
|
|
|
if err := it.Err(); err != nil {
|
2024-11-06 04:09:57 -05:00
|
|
|
d.logger.Error("Error on alert update", "err", err)
|
2015-09-29 10:00:02 +02:00
|
|
|
continue
|
|
|
|
|
}
|
2015-09-25 00:15:27 +02:00
|
|
|
|
2025-12-05 22:58:44 +01:00
|
|
|
ctx := d.ctx
|
|
|
|
|
if alert.Header != nil {
|
|
|
|
|
ctx = d.propagator.Extract(ctx, propagation.MapCarrier(alert.Header))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
d.routeAlert(ctx, alert.Data)
|
2015-09-25 00:15:27 +02:00
|
|
|
|
2025-11-15 15:35:59 +01:00
|
|
|
case <-d.startTimer.C:
|
|
|
|
|
if d.state == DispatcherStateWaitingToStart {
|
|
|
|
|
d.state = DispatcherStateRunning
|
|
|
|
|
d.logger.Debug("started", "state", "running")
|
|
|
|
|
d.logger.Debug("Starting all existing aggregation groups")
|
|
|
|
|
for _, groups := range d.aggrGroupsPerRoute {
|
|
|
|
|
for _, ag := range groups {
|
|
|
|
|
d.runAG(ag)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-13 11:16:26 +01:00
|
|
|
case <-maintenance.C:
|
|
|
|
|
d.doMaintenance()
|
2015-09-25 00:15:27 +02:00
|
|
|
case <-d.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-07-04 12:52:53 +02:00
|
|
|
|
2025-12-05 22:58:44 +01:00
|
|
|
func (d *Dispatcher) routeAlert(ctx context.Context, alert *types.Alert) {
|
|
|
|
|
d.logger.Debug("Received alert", "alert", alert)
|
|
|
|
|
|
|
|
|
|
ctx, span := tracer.Start(ctx, "dispatch.Dispatcher.routeAlert",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.alert.name", alert.Name()),
|
|
|
|
|
attribute.String("alerting.alert.fingerprint", alert.Fingerprint().String()),
|
|
|
|
|
),
|
|
|
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
|
|
|
)
|
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
for _, r := range d.route.Match(alert.Labels) {
|
|
|
|
|
span.AddEvent("dispatching alert to route",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.route.receiver.name", r.RouteOpts.Receiver),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
d.groupAlert(ctx, alert, r)
|
|
|
|
|
}
|
|
|
|
|
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-13 11:16:26 +01:00
|
|
|
func (d *Dispatcher) doMaintenance() {
|
|
|
|
|
d.mtx.Lock()
|
|
|
|
|
defer d.mtx.Unlock()
|
|
|
|
|
for _, groups := range d.aggrGroupsPerRoute {
|
|
|
|
|
for _, ag := range groups {
|
|
|
|
|
if ag.empty() {
|
|
|
|
|
ag.stop()
|
|
|
|
|
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
|
|
|
|
|
delete(groups, ag.fingerprint())
|
|
|
|
|
d.aggrGroupsNum--
|
|
|
|
|
d.metrics.aggrGroups.Dec()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-18 01:42:18 -07:00
|
|
|
func (d *Dispatcher) WaitForLoading() {
|
|
|
|
|
d.loadingFinished.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *Dispatcher) LoadingDone() <-chan struct{} {
|
|
|
|
|
doneChan := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
d.WaitForLoading()
|
|
|
|
|
close(doneChan)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return doneChan
|
|
|
|
|
}
|
|
|
|
|
|
2019-03-07 17:18:18 +01:00
|
|
|
// AlertGroup represents how alerts exist within an aggrGroup.
|
|
|
|
|
type AlertGroup struct {
|
2019-07-24 17:12:37 +02:00
|
|
|
Alerts types.AlertSlice
|
2019-03-07 17:18:18 +01:00
|
|
|
Labels model.LabelSet
|
|
|
|
|
Receiver string
|
2024-10-23 16:42:21 +01:00
|
|
|
GroupKey string
|
|
|
|
|
RouteID string
|
2019-03-07 17:18:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type AlertGroups []*AlertGroup
|
|
|
|
|
|
2019-07-24 17:12:37 +02:00
|
|
|
func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
|
|
|
|
|
func (ag AlertGroups) Less(i, j int) bool {
|
|
|
|
|
if ag[i].Labels.Equal(ag[j].Labels) {
|
|
|
|
|
return ag[i].Receiver < ag[j].Receiver
|
|
|
|
|
}
|
|
|
|
|
return ag[i].Labels.Before(ag[j].Labels)
|
|
|
|
|
}
|
|
|
|
|
func (ag AlertGroups) Len() int { return len(ag) }
|
2019-03-07 17:18:18 +01:00
|
|
|
|
|
|
|
|
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
|
2025-11-18 01:42:18 -07:00
|
|
|
func (d *Dispatcher) Groups(ctx context.Context, routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string, error) {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return nil, nil, ctx.Err()
|
|
|
|
|
case <-d.LoadingDone():
|
|
|
|
|
}
|
|
|
|
|
d.WaitForLoading()
|
2019-03-07 17:18:18 +01:00
|
|
|
groups := AlertGroups{}
|
|
|
|
|
|
2025-11-19 10:23:37 -07:00
|
|
|
// Make a snapshot of the aggrGroupsPerRoute map to use for this function.
|
|
|
|
|
// This ensures that we hold the Dispatcher.mtx for as little time as
|
|
|
|
|
// possible.
|
|
|
|
|
// It also prevents us from holding the any locks in alertFilter or routeFilter
|
|
|
|
|
// while we hold the dispatcher lock
|
2019-03-07 17:18:18 +01:00
|
|
|
d.mtx.RLock()
|
2025-11-19 10:23:37 -07:00
|
|
|
aggrGroupsPerRoute := map[*Route]map[model.Fingerprint]*aggrGroup{}
|
|
|
|
|
for route, ags := range d.aggrGroupsPerRoute {
|
|
|
|
|
// Since other goroutines could modify d.aggrGroupsPerRoute, we need to
|
|
|
|
|
// copy it. We DON'T need to copy the aggrGroup objects because they each
|
|
|
|
|
// have a mutex protecting their internal state.
|
|
|
|
|
// The aggrGroup methods use the internal lock. It is important to avoid
|
|
|
|
|
// accessing internal fields on the aggrGroup objects.
|
|
|
|
|
aggrGroupsPerRoute[route] = maps.Clone(ags)
|
|
|
|
|
}
|
|
|
|
|
d.mtx.RUnlock()
|
2019-03-07 17:18:18 +01:00
|
|
|
|
|
|
|
|
// Keep a list of receivers for an alert to prevent checking each alert
|
|
|
|
|
// again against all routes. The alert has already matched against this
|
|
|
|
|
// route on ingestion.
|
|
|
|
|
receivers := map[model.Fingerprint][]string{}
|
|
|
|
|
|
2019-07-24 17:12:37 +02:00
|
|
|
now := time.Now()
|
2025-11-19 10:23:37 -07:00
|
|
|
for route, ags := range aggrGroupsPerRoute {
|
2019-03-07 17:18:18 +01:00
|
|
|
if !routeFilter(route) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, ag := range ags {
|
|
|
|
|
receiver := route.RouteOpts.Receiver
|
2019-07-24 17:12:37 +02:00
|
|
|
alertGroup := &AlertGroup{
|
|
|
|
|
Labels: ag.labels,
|
|
|
|
|
Receiver: receiver,
|
2024-10-23 16:42:21 +01:00
|
|
|
GroupKey: ag.GroupKey(),
|
|
|
|
|
RouteID: ag.routeID,
|
2019-03-07 17:18:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
alerts := ag.alerts.List()
|
|
|
|
|
filteredAlerts := make([]*types.Alert, 0, len(alerts))
|
2019-04-19 14:01:41 +02:00
|
|
|
for _, a := range alerts {
|
2019-03-07 17:18:18 +01:00
|
|
|
if !alertFilter(a, now) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fp := a.Fingerprint()
|
|
|
|
|
if r, ok := receivers[fp]; ok {
|
|
|
|
|
// Receivers slice already exists. Add
|
|
|
|
|
// the current receiver to the slice.
|
|
|
|
|
receivers[fp] = append(r, receiver)
|
|
|
|
|
} else {
|
|
|
|
|
// First time we've seen this alert fingerprint.
|
|
|
|
|
// Initialize a new receivers slice.
|
|
|
|
|
receivers[fp] = []string{receiver}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
filteredAlerts = append(filteredAlerts, a)
|
|
|
|
|
}
|
|
|
|
|
if len(filteredAlerts) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
alertGroup.Alerts = filteredAlerts
|
|
|
|
|
|
|
|
|
|
groups = append(groups, alertGroup)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sort.Sort(groups)
|
2019-07-24 17:12:37 +02:00
|
|
|
for i := range groups {
|
|
|
|
|
sort.Sort(groups[i].Alerts)
|
|
|
|
|
}
|
|
|
|
|
for i := range receivers {
|
|
|
|
|
sort.Strings(receivers[i])
|
|
|
|
|
}
|
2019-03-07 17:18:18 +01:00
|
|
|
|
2025-11-18 01:42:18 -07:00
|
|
|
return groups, receivers, nil
|
2019-03-07 17:18:18 +01:00
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// Stop the dispatcher.
|
|
|
|
|
func (d *Dispatcher) Stop() {
|
2020-03-19 10:32:37 -04:00
|
|
|
if d == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
d.mtx.Lock()
|
|
|
|
|
if d.cancel == nil {
|
2021-04-27 10:44:18 +02:00
|
|
|
d.mtx.Unlock()
|
2015-10-11 16:54:39 +02:00
|
|
|
return
|
|
|
|
|
}
|
2015-09-25 00:15:27 +02:00
|
|
|
d.cancel()
|
2015-09-25 18:14:46 +02:00
|
|
|
d.cancel = nil
|
2021-04-27 10:44:18 +02:00
|
|
|
d.mtx.Unlock()
|
2015-09-26 11:12:47 +02:00
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
<-d.done
|
|
|
|
|
}
|
2015-07-04 12:52:53 +02:00
|
|
|
|
2019-02-08 21:57:08 +08:00
|
|
|
// notifyFunc is a function that performs notification for the alert
|
2015-09-25 00:15:27 +02:00
|
|
|
// with the given fingerprint. It aborts on context cancelation.
|
2025-12-05 22:58:44 +01:00
|
|
|
// Returns false if notifying failed.
|
2015-09-26 14:12:55 +02:00
|
|
|
type notifyFunc func(context.Context, ...*types.Alert) bool
|
2015-09-25 00:15:27 +02:00
|
|
|
|
2025-12-05 22:58:44 +01:00
|
|
|
// groupAlert determines in which aggregation group the alert falls
|
2017-11-01 15:03:53 +01:00
|
|
|
// and inserts it.
|
2025-12-05 22:58:44 +01:00
|
|
|
func (d *Dispatcher) groupAlert(ctx context.Context, alert *types.Alert, route *Route) {
|
|
|
|
|
_, span := tracer.Start(ctx, "dispatch.Dispatcher.groupAlert",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.alert.name", alert.Name()),
|
|
|
|
|
attribute.String("alerting.alert.fingerprint", alert.Fingerprint().String()),
|
|
|
|
|
attribute.String("alerting.route.receiver.name", route.RouteOpts.Receiver),
|
|
|
|
|
),
|
|
|
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
|
|
|
)
|
|
|
|
|
defer span.End()
|
|
|
|
|
|
2025-11-15 15:35:59 +01:00
|
|
|
now := time.Now()
|
2018-11-29 12:31:14 +01:00
|
|
|
groupLabels := getGroupLabels(alert, route)
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2017-11-01 15:03:53 +01:00
|
|
|
fp := groupLabels.Fingerprint()
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2015-11-19 13:47:31 +01:00
|
|
|
d.mtx.Lock()
|
2018-07-10 23:13:41 +08:00
|
|
|
defer d.mtx.Unlock()
|
|
|
|
|
|
2021-05-05 17:26:37 +02:00
|
|
|
routeGroups, ok := d.aggrGroupsPerRoute[route]
|
2015-10-16 16:55:56 +02:00
|
|
|
if !ok {
|
2021-05-05 17:26:37 +02:00
|
|
|
routeGroups = map[model.Fingerprint]*aggrGroup{}
|
|
|
|
|
d.aggrGroupsPerRoute[route] = routeGroups
|
2015-10-16 16:55:56 +02:00
|
|
|
}
|
|
|
|
|
|
2021-05-05 17:26:37 +02:00
|
|
|
ag, ok := routeGroups[fp]
|
2021-04-30 10:11:10 +02:00
|
|
|
if ok {
|
2025-12-05 22:58:44 +01:00
|
|
|
ag.insert(ctx, alert)
|
2021-04-30 10:11:10 +02:00
|
|
|
return
|
2015-07-02 18:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2021-05-05 17:26:37 +02:00
|
|
|
// If the group does not exist, create it. But check the limit first.
|
|
|
|
|
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
|
|
|
|
|
d.metrics.aggrGroupLimitReached.Inc()
|
2025-12-05 22:58:44 +01:00
|
|
|
err := errors.New("too many aggregation groups, cannot create new group for alert")
|
|
|
|
|
message := "Failed to create aggregation group"
|
|
|
|
|
d.logger.Error(message, "err", err.Error(), "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
|
|
|
|
|
span.SetStatus(codes.Error, message)
|
|
|
|
|
span.RecordError(err,
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.Int("alerting.aggregation_group.count", d.aggrGroupsNum),
|
|
|
|
|
attribute.Int("alerting.aggregation_group.limit", limit),
|
|
|
|
|
),
|
|
|
|
|
)
|
2021-05-05 17:26:37 +02:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-23 17:24:01 +02:00
|
|
|
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.marker.(types.AlertMarker), d.logger)
|
2021-05-05 17:26:37 +02:00
|
|
|
routeGroups[fp] = ag
|
|
|
|
|
d.aggrGroupsNum++
|
2021-04-30 10:11:10 +02:00
|
|
|
d.metrics.aggrGroups.Inc()
|
2025-12-05 22:58:44 +01:00
|
|
|
span.AddEvent("new AggregationGroup created",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.aggregation_group.key", ag.GroupKey()),
|
|
|
|
|
attribute.Int("alerting.aggregation_group.count", d.aggrGroupsNum),
|
|
|
|
|
),
|
|
|
|
|
)
|
2021-04-30 10:11:10 +02:00
|
|
|
|
|
|
|
|
// Insert the 1st alert in the group before starting the group's run()
|
|
|
|
|
// function, to make sure that when the run() will be executed the 1st
|
|
|
|
|
// alert is already there.
|
2025-12-05 22:58:44 +01:00
|
|
|
ag.insert(ctx, alert)
|
2021-04-30 10:11:10 +02:00
|
|
|
|
2025-11-15 15:35:59 +01:00
|
|
|
if alert.StartsAt.Add(ag.opts.GroupWait).Before(now) {
|
2025-12-05 22:58:44 +01:00
|
|
|
message := "Alert is old enough for immediate flush, resetting timer to zero"
|
|
|
|
|
ag.logger.Debug(message, "alert", alert.Name(), "fingerprint", alert.Fingerprint(), "startsAt", alert.StartsAt)
|
|
|
|
|
span.AddEvent(message,
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.alert.StartsAt", alert.StartsAt.Format(time.RFC3339)),
|
|
|
|
|
),
|
2025-11-15 15:35:59 +01:00
|
|
|
)
|
|
|
|
|
ag.resetTimer(0)
|
|
|
|
|
}
|
|
|
|
|
// Check dispatcher and alert state to determine if we should run the AG now.
|
|
|
|
|
switch d.state {
|
|
|
|
|
case DispatcherStateWaitingToStart:
|
2025-12-05 22:58:44 +01:00
|
|
|
span.AddEvent("Not starting Aggregation Group, dispatcher is not running")
|
2025-11-15 15:35:59 +01:00
|
|
|
d.logger.Debug("Dispatcher still waiting to start")
|
|
|
|
|
case DispatcherStateRunning:
|
2025-12-05 22:58:44 +01:00
|
|
|
span.AddEvent("Starting Aggregation Group")
|
2025-11-15 15:35:59 +01:00
|
|
|
d.runAG(ag)
|
|
|
|
|
default:
|
|
|
|
|
d.logger.Warn("unknown state detected", "state", "unknown")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *Dispatcher) runAG(ag *aggrGroup) {
|
|
|
|
|
if ag.running.Load() {
|
|
|
|
|
return
|
|
|
|
|
}
|
2021-04-30 10:11:10 +02:00
|
|
|
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
|
|
|
|
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
|
|
|
|
|
if err != nil {
|
2025-11-05 15:04:52 -06:00
|
|
|
logger := d.logger.With("aggrGroup", ag.GroupKey(), "num_alerts", len(alerts), "err", err)
|
2023-11-24 22:17:35 +01:00
|
|
|
if errors.Is(ctx.Err(), context.Canceled) {
|
2021-04-30 10:11:10 +02:00
|
|
|
// It is expected for the context to be canceled on
|
|
|
|
|
// configuration reload or shutdown. In this case, the
|
|
|
|
|
// message should only be logged at the debug level.
|
2024-11-06 04:09:57 -05:00
|
|
|
logger.Debug("Notify for alerts failed")
|
|
|
|
|
} else {
|
|
|
|
|
logger.Error("Notify for alerts failed")
|
2021-04-30 10:11:10 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return err == nil
|
|
|
|
|
})
|
2015-07-04 12:52:53 +02:00
|
|
|
}
|
2015-07-02 20:48:21 +02:00
|
|
|
|
2018-11-29 12:31:14 +01:00
|
|
|
func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {
|
|
|
|
|
groupLabels := model.LabelSet{}
|
|
|
|
|
for ln, lv := range alert.Labels {
|
|
|
|
|
if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
|
|
|
|
|
groupLabels[ln] = lv
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return groupLabels
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// aggrGroup aggregates alert fingerprints into groups to which a
|
|
|
|
|
// common set of routing options applies.
|
|
|
|
|
// It emits notifications in the specified intervals.
|
|
|
|
|
type aggrGroup struct {
|
2017-04-21 11:43:12 +02:00
|
|
|
labels model.LabelSet
|
|
|
|
|
opts *RouteOpts
|
2024-11-06 04:09:57 -05:00
|
|
|
logger *slog.Logger
|
2024-05-13 11:16:26 +01:00
|
|
|
routeID string
|
2017-04-21 11:43:12 +02:00
|
|
|
routeKey string
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2018-09-03 14:52:53 +02:00
|
|
|
alerts *store.Alerts
|
2025-06-23 17:24:01 +02:00
|
|
|
marker types.AlertMarker
|
2016-09-05 13:19:14 +02:00
|
|
|
ctx context.Context
|
|
|
|
|
cancel func()
|
|
|
|
|
done chan struct{}
|
|
|
|
|
next *time.Timer
|
|
|
|
|
timeout func(time.Duration) time.Duration
|
2025-11-15 15:35:59 +01:00
|
|
|
running atomic.Bool
|
2015-07-02 18:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
// newAggrGroup returns a new aggregation group.
|
2025-11-15 15:35:59 +01:00
|
|
|
func newAggrGroup(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
labels model.LabelSet,
|
|
|
|
|
r *Route,
|
|
|
|
|
to func(time.Duration) time.Duration,
|
|
|
|
|
marker types.AlertMarker,
|
|
|
|
|
logger *slog.Logger,
|
|
|
|
|
) *aggrGroup {
|
2016-09-05 13:19:14 +02:00
|
|
|
if to == nil {
|
|
|
|
|
to = func(d time.Duration) time.Duration { return d }
|
|
|
|
|
}
|
2015-07-02 20:48:21 +02:00
|
|
|
ag := &aggrGroup{
|
2017-04-21 11:43:12 +02:00
|
|
|
labels: labels,
|
2024-05-13 11:16:26 +01:00
|
|
|
routeID: r.ID(),
|
2017-04-21 11:43:12 +02:00
|
|
|
routeKey: r.Key(),
|
|
|
|
|
opts: &r.RouteOpts,
|
|
|
|
|
timeout: to,
|
2019-09-18 09:29:34 +02:00
|
|
|
alerts: store.NewAlerts(),
|
2025-06-23 17:24:01 +02:00
|
|
|
marker: marker,
|
2019-06-25 16:11:45 +08:00
|
|
|
done: make(chan struct{}),
|
2015-07-02 20:48:21 +02:00
|
|
|
}
|
2015-09-25 00:15:27 +02:00
|
|
|
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2024-11-06 04:09:57 -05:00
|
|
|
ag.logger = logger.With("aggrGroup", ag)
|
2015-10-27 18:24:09 +01:00
|
|
|
|
2015-10-07 16:18:55 +02:00
|
|
|
// Set an initial one-time wait before flushing
|
|
|
|
|
// the first batch of notifications.
|
|
|
|
|
ag.next = time.NewTimer(ag.opts.GroupWait)
|
|
|
|
|
|
2015-07-02 20:48:21 +02:00
|
|
|
return ag
|
|
|
|
|
}
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2017-04-21 11:43:12 +02:00
|
|
|
func (ag *aggrGroup) fingerprint() model.Fingerprint {
|
|
|
|
|
return ag.labels.Fingerprint()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ag *aggrGroup) GroupKey() string {
|
|
|
|
|
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-26 17:54:49 +02:00
|
|
|
func (ag *aggrGroup) String() string {
|
2017-04-21 11:43:12 +02:00
|
|
|
return ag.GroupKey()
|
2015-09-26 17:54:49 +02:00
|
|
|
}
|
|
|
|
|
|
2015-09-29 15:12:31 +02:00
|
|
|
func (ag *aggrGroup) run(nf notifyFunc) {
|
2025-11-15 15:35:59 +01:00
|
|
|
ag.running.Store(true)
|
2015-09-25 00:15:27 +02:00
|
|
|
defer close(ag.done)
|
2015-07-04 12:52:53 +02:00
|
|
|
defer ag.next.Stop()
|
|
|
|
|
|
2015-07-02 18:38:05 +02:00
|
|
|
for {
|
|
|
|
|
select {
|
2015-10-09 08:26:41 +02:00
|
|
|
case now := <-ag.next.C:
|
2018-01-11 22:45:59 +01:00
|
|
|
// Give the notifications time until the next flush to
|
2015-09-26 18:03:54 +02:00
|
|
|
// finish before terminating them.
|
2016-09-05 13:19:14 +02:00
|
|
|
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
|
2015-09-25 00:15:27 +02:00
|
|
|
|
2015-10-09 08:26:41 +02:00
|
|
|
// The now time we retrieve from the ticker is the only reliable
|
|
|
|
|
// point of time reference for the subsequent notification pipeline.
|
2015-10-09 08:58:44 +02:00
|
|
|
// Calculating the current time directly is prone to flaky behavior,
|
2015-10-09 08:26:41 +02:00
|
|
|
// which usually only becomes apparent in tests.
|
2015-10-09 08:43:39 +02:00
|
|
|
ctx = notify.WithNow(ctx, now)
|
2015-10-09 08:26:41 +02:00
|
|
|
|
2015-10-09 08:43:39 +02:00
|
|
|
// Populate context with information needed along the pipeline.
|
2017-04-21 11:43:12 +02:00
|
|
|
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
|
2015-10-16 16:55:56 +02:00
|
|
|
ctx = notify.WithGroupLabels(ctx, ag.labels)
|
2016-08-16 14:22:47 +02:00
|
|
|
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
|
2015-10-09 08:43:39 +02:00
|
|
|
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
|
2020-11-24 15:02:07 +11:00
|
|
|
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
|
2022-03-04 09:24:29 -05:00
|
|
|
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
|
2024-05-13 11:16:26 +01:00
|
|
|
ctx = notify.WithRouteID(ctx, ag.routeID)
|
2015-10-08 10:50:37 +02:00
|
|
|
|
2015-07-04 12:52:53 +02:00
|
|
|
// Wait the configured interval before calling flush again.
|
2025-11-15 15:35:59 +01:00
|
|
|
ag.resetTimer(ag.opts.GroupInterval)
|
2015-07-04 12:52:53 +02:00
|
|
|
|
2015-09-26 14:12:55 +02:00
|
|
|
ag.flush(func(alerts ...*types.Alert) bool {
|
2025-12-05 22:58:44 +01:00
|
|
|
ctx, span := tracer.Start(ctx, "dispatch.AggregationGroup.flush",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.aggregation_group.key", ag.GroupKey()),
|
|
|
|
|
attribute.Int("alerting.alerts.count", len(alerts)),
|
|
|
|
|
),
|
|
|
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
|
|
|
)
|
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
|
|
success := nf(ctx, alerts...)
|
|
|
|
|
if !success {
|
|
|
|
|
span.SetStatus(codes.Error, "notification failed")
|
|
|
|
|
}
|
|
|
|
|
return success
|
2015-09-25 00:15:27 +02:00
|
|
|
})
|
|
|
|
|
|
2015-10-09 08:58:44 +02:00
|
|
|
cancel()
|
|
|
|
|
|
2015-09-25 00:15:27 +02:00
|
|
|
case <-ag.ctx.Done():
|
2015-07-02 18:38:05 +02:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ag *aggrGroup) stop() {
|
2015-09-25 00:15:27 +02:00
|
|
|
// Calling cancel will terminate all in-process notifications
|
|
|
|
|
// and the run() loop.
|
|
|
|
|
ag.cancel()
|
|
|
|
|
<-ag.done
|
2015-07-02 18:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2025-11-15 15:35:59 +01:00
|
|
|
// resetTimer resets the timer for the AG.
|
|
|
|
|
func (ag *aggrGroup) resetTimer(t time.Duration) {
|
|
|
|
|
ag.next.Reset(t)
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-18 11:12:17 +01:00
|
|
|
// insert inserts the alert into the aggregation group.
|
2025-12-05 22:58:44 +01:00
|
|
|
func (ag *aggrGroup) insert(ctx context.Context, alert *types.Alert) {
|
|
|
|
|
_, span := tracer.Start(ctx, "dispatch.AggregationGroup.insert",
|
|
|
|
|
trace.WithAttributes(
|
|
|
|
|
attribute.String("alerting.alert.name", alert.Name()),
|
|
|
|
|
attribute.String("alerting.alert.fingerprint", alert.Fingerprint().String()),
|
|
|
|
|
attribute.String("alerting.aggregation_group.key", ag.GroupKey()),
|
|
|
|
|
),
|
|
|
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
|
|
|
)
|
|
|
|
|
defer span.End()
|
2018-09-03 14:52:53 +02:00
|
|
|
if err := ag.alerts.Set(alert); err != nil {
|
2025-12-05 22:58:44 +01:00
|
|
|
message := "error on set alert"
|
|
|
|
|
span.SetStatus(codes.Error, message)
|
|
|
|
|
span.RecordError(err)
|
|
|
|
|
ag.logger.Error(message, "err", err)
|
2018-09-03 14:52:53 +02:00
|
|
|
}
|
2015-07-02 18:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2015-07-02 20:48:21 +02:00
|
|
|
func (ag *aggrGroup) empty() bool {
|
2019-04-19 14:01:41 +02:00
|
|
|
return ag.alerts.Empty()
|
2015-07-02 20:48:21 +02:00
|
|
|
}
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2015-07-02 20:48:21 +02:00
|
|
|
// flush sends notifications for all new alerts.
|
2015-09-26 14:12:55 +02:00
|
|
|
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
2015-09-27 19:50:41 +02:00
|
|
|
if ag.empty() {
|
|
|
|
|
return
|
|
|
|
|
}
|
2015-07-02 18:38:05 +02:00
|
|
|
|
2015-09-26 14:12:55 +02:00
|
|
|
var (
|
2024-05-07 10:34:03 +01:00
|
|
|
alerts = ag.alerts.List()
|
|
|
|
|
alertsSlice = make(types.AlertSlice, 0, len(alerts))
|
|
|
|
|
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
|
|
|
|
|
now = time.Now()
|
2015-09-26 14:12:55 +02:00
|
|
|
)
|
2019-04-19 14:01:41 +02:00
|
|
|
for _, alert := range alerts {
|
2019-01-04 15:52:20 +00:00
|
|
|
a := *alert
|
|
|
|
|
// Ensure that alerts don't resolve as time move forwards.
|
2024-05-07 10:34:03 +01:00
|
|
|
if a.ResolvedAt(now) {
|
|
|
|
|
resolvedSlice = append(resolvedSlice, &a)
|
|
|
|
|
} else {
|
2019-01-04 15:52:20 +00:00
|
|
|
a.EndsAt = time.Time{}
|
|
|
|
|
}
|
|
|
|
|
alertsSlice = append(alertsSlice, &a)
|
2015-07-04 12:52:53 +02:00
|
|
|
}
|
2018-06-14 15:54:33 +02:00
|
|
|
sort.Stable(alertsSlice)
|
2018-03-22 15:06:37 -04:00
|
|
|
|
2024-11-06 04:09:57 -05:00
|
|
|
ag.logger.Debug("flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
|
2015-09-30 14:53:52 +02:00
|
|
|
|
2015-09-26 14:12:55 +02:00
|
|
|
if notify(alertsSlice...) {
|
2024-05-07 10:34:03 +01:00
|
|
|
// Delete all resolved alerts as we just sent a notification for them,
|
|
|
|
|
// and we don't want to send another one. However, we need to make sure
|
|
|
|
|
// that each resolved alert has not fired again during the flush as then
|
|
|
|
|
// we would delete an active alert thinking it was resolved.
|
|
|
|
|
if err := ag.alerts.DeleteIfNotModified(resolvedSlice); err != nil {
|
2024-11-06 04:09:57 -05:00
|
|
|
ag.logger.Error("error on delete alerts", "err", err)
|
2025-06-23 17:24:01 +02:00
|
|
|
} else {
|
|
|
|
|
// Delete markers for resolved alerts that are not in the store.
|
|
|
|
|
for _, alert := range resolvedSlice {
|
|
|
|
|
_, err := ag.alerts.Get(alert.Fingerprint())
|
|
|
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
|
|
|
ag.marker.Delete(alert.Fingerprint())
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-09-26 14:12:55 +02:00
|
|
|
}
|
2015-07-10 19:25:56 +02:00
|
|
|
}
|
2015-09-25 00:15:27 +02:00
|
|
|
}
|
2021-05-05 17:26:37 +02:00
|
|
|
|
|
|
|
|
type nilLimits struct{}
|
|
|
|
|
|
|
|
|
|
func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
|