mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 15:45:34 +01:00
pull out shared code for storing alerts (#1507)
Move the code for storing and GC'ing alerts from being re-implemented in several packages to existing in its own package Signed-off-by: stuart nelson <stuartnelson3@gmail.com>
This commit is contained in:
@@ -273,7 +273,7 @@ func main() {
|
||||
go peer.Settle(ctx, *gossipInterval*10)
|
||||
}
|
||||
|
||||
alerts, err := mem.NewAlerts(marker, *alertGCInterval)
|
||||
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("err", err)
|
||||
os.Exit(1)
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
@@ -197,6 +198,7 @@ type aggrGroup struct {
|
||||
logger log.Logger
|
||||
routeKey string
|
||||
|
||||
alerts *store.Alerts
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
done chan struct{}
|
||||
@@ -204,7 +206,6 @@ type aggrGroup struct {
|
||||
timeout func(time.Duration) time.Duration
|
||||
|
||||
mtx sync.RWMutex
|
||||
alerts map[model.Fingerprint]*types.Alert
|
||||
hasFlushed bool
|
||||
}
|
||||
|
||||
@@ -218,9 +219,10 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
|
||||
routeKey: r.Key(),
|
||||
opts: &r.RouteOpts,
|
||||
timeout: to,
|
||||
alerts: map[model.Fingerprint]*types.Alert{},
|
||||
alerts: store.NewAlerts(15 * time.Minute),
|
||||
}
|
||||
ag.ctx, ag.cancel = context.WithCancel(ctx)
|
||||
ag.alerts.Run(ag.ctx)
|
||||
|
||||
ag.logger = log.With(logger, "aggrGroup", ag)
|
||||
|
||||
@@ -295,23 +297,21 @@ func (ag *aggrGroup) stop() {
|
||||
|
||||
// insert inserts the alert into the aggregation group.
|
||||
func (ag *aggrGroup) insert(alert *types.Alert) {
|
||||
ag.mtx.Lock()
|
||||
defer ag.mtx.Unlock()
|
||||
|
||||
ag.alerts[alert.Fingerprint()] = alert
|
||||
if err := ag.alerts.Set(alert); err != nil {
|
||||
level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
|
||||
}
|
||||
|
||||
// Immediately trigger a flush if the wait duration for this
|
||||
// alert is already over.
|
||||
ag.mtx.Lock()
|
||||
defer ag.mtx.Unlock()
|
||||
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
|
||||
ag.next.Reset(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (ag *aggrGroup) empty() bool {
|
||||
ag.mtx.RLock()
|
||||
defer ag.mtx.RUnlock()
|
||||
|
||||
return len(ag.alerts) == 0
|
||||
return ag.alerts.Count() == 0
|
||||
}
|
||||
|
||||
// flush sends notifications for all new alerts.
|
||||
@@ -319,31 +319,35 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
|
||||
if ag.empty() {
|
||||
return
|
||||
}
|
||||
ag.mtx.Lock()
|
||||
|
||||
var (
|
||||
alerts = make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
|
||||
alertsSlice = make(types.AlertSlice, 0, len(ag.alerts))
|
||||
alerts = ag.alerts.List()
|
||||
alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count())
|
||||
)
|
||||
for fp, alert := range ag.alerts {
|
||||
alerts[fp] = alert
|
||||
for alert := range alerts {
|
||||
alertsSlice = append(alertsSlice, alert)
|
||||
}
|
||||
sort.Stable(alertsSlice)
|
||||
|
||||
ag.mtx.Unlock()
|
||||
|
||||
level.Debug(ag.logger).Log("msg", "Flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
|
||||
level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
|
||||
|
||||
if notify(alertsSlice...) {
|
||||
ag.mtx.Lock()
|
||||
for fp, a := range alerts {
|
||||
for _, a := range alertsSlice {
|
||||
// Only delete if the fingerprint has not been inserted
|
||||
// again since we notified about it.
|
||||
if a.Resolved() && ag.alerts[fp] == a {
|
||||
delete(ag.alerts, fp)
|
||||
fp := a.Fingerprint()
|
||||
got, err := ag.alerts.Get(fp)
|
||||
if err != nil {
|
||||
// This should only happen if the Alert was
|
||||
// deleted from the store during the flush.
|
||||
level.Error(ag.logger).Log("msg", "failed to get alert", "err", err)
|
||||
continue
|
||||
}
|
||||
if a.Resolved() && got == a {
|
||||
if err := ag.alerts.Delete(fp); err != nil {
|
||||
level.Error(ag.logger).Log("msg", "error on delete alert", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ag.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
@@ -54,19 +55,6 @@ func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker,
|
||||
return ih
|
||||
}
|
||||
|
||||
func (ih *Inhibitor) runGC(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(15 * time.Minute):
|
||||
for _, r := range ih.rules {
|
||||
r.gc()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ih *Inhibitor) run(ctx context.Context) {
|
||||
it := ih.alerts.Subscribe()
|
||||
defer it.Close()
|
||||
@@ -83,14 +71,16 @@ func (ih *Inhibitor) run(ctx context.Context) {
|
||||
// Update the inhibition rules' cache.
|
||||
for _, r := range ih.rules {
|
||||
if r.SourceMatchers.Match(a.Labels) {
|
||||
r.set(a)
|
||||
if err := r.scache.Set(a); err != nil {
|
||||
level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the Inihibitor's background processing.
|
||||
// Run the Inhibitor's background processing.
|
||||
func (ih *Inhibitor) Run() {
|
||||
var (
|
||||
g group.Group
|
||||
@@ -100,15 +90,12 @@ func (ih *Inhibitor) Run() {
|
||||
ih.mtx.Lock()
|
||||
ctx, ih.cancel = context.WithCancel(context.Background())
|
||||
ih.mtx.Unlock()
|
||||
gcCtx, gcCancel := context.WithCancel(ctx)
|
||||
runCtx, runCancel := context.WithCancel(ctx)
|
||||
|
||||
g.Add(func() error {
|
||||
ih.runGC(gcCtx)
|
||||
return nil
|
||||
}, func(err error) {
|
||||
gcCancel()
|
||||
})
|
||||
for _, rule := range ih.rules {
|
||||
rule.scache.Run(runCtx)
|
||||
}
|
||||
|
||||
g.Add(func() error {
|
||||
ih.run(runCtx)
|
||||
return nil
|
||||
@@ -166,12 +153,11 @@ type InhibitRule struct {
|
||||
// target alerts in order for the inhibition to take effect.
|
||||
Equal map[model.LabelName]struct{}
|
||||
|
||||
mtx sync.RWMutex
|
||||
// Cache of alerts matching source labels.
|
||||
scache map[model.Fingerprint]*types.Alert
|
||||
scache *store.Alerts
|
||||
}
|
||||
|
||||
// NewInhibitRule returns a new InihibtRule based on a configuration definition.
|
||||
// NewInhibitRule returns a new InhibitRule based on a configuration definition.
|
||||
func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
|
||||
var (
|
||||
sourcem types.Matchers
|
||||
@@ -201,26 +187,15 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
|
||||
SourceMatchers: sourcem,
|
||||
TargetMatchers: targetm,
|
||||
Equal: equal,
|
||||
scache: map[model.Fingerprint]*types.Alert{},
|
||||
scache: store.NewAlerts(15 * time.Minute),
|
||||
}
|
||||
}
|
||||
|
||||
// set the alert in the source cache.
|
||||
func (r *InhibitRule) set(a *types.Alert) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
r.scache[a.Fingerprint()] = a
|
||||
}
|
||||
|
||||
// hasEqual checks whether the source cache contains alerts matching
|
||||
// the equal labels for the given label set.
|
||||
func (r *InhibitRule) hasEqual(lset model.LabelSet) (model.Fingerprint, bool) {
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
|
||||
Outer:
|
||||
for fp, a := range r.scache {
|
||||
for a := range r.scache.List() {
|
||||
// The cache might be stale and contain resolved alerts.
|
||||
if a.Resolved() {
|
||||
continue
|
||||
@@ -230,19 +205,7 @@ Outer:
|
||||
continue Outer
|
||||
}
|
||||
}
|
||||
return fp, true
|
||||
return a.Fingerprint(), true
|
||||
}
|
||||
return model.Fingerprint(0), false
|
||||
}
|
||||
|
||||
// gc clears out resolved alerts from the source cache.
|
||||
func (r *InhibitRule) gc() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
for fp, a := range r.scache {
|
||||
if a.Resolved() {
|
||||
delete(r.scache, fp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,16 +14,15 @@
|
||||
package inhibit
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
@@ -122,22 +121,18 @@ func TestInhibitRuleHasEqual(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
r := &InhibitRule{
|
||||
Equal: map[model.LabelName]struct{}{},
|
||||
scache: map[model.Fingerprint]*types.Alert{},
|
||||
scache: store.NewAlerts(5 * time.Minute),
|
||||
}
|
||||
for _, ln := range c.equal {
|
||||
r.Equal[ln] = struct{}{}
|
||||
}
|
||||
for k, v := range c.initial {
|
||||
r.scache[k] = v
|
||||
for _, v := range c.initial {
|
||||
r.scache.Set(v)
|
||||
}
|
||||
|
||||
if _, have := r.hasEqual(c.input); have != c.result {
|
||||
t.Errorf("Unexpected result %t, expected %t", have, c.result)
|
||||
}
|
||||
if !reflect.DeepEqual(r.scache, c.initial) {
|
||||
t.Errorf("Cache state unexpectedly changed")
|
||||
t.Errorf(pretty.Compare(r.scache, c.initial))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,14 +150,16 @@ func TestInhibitRuleMatches(t *testing.T) {
|
||||
ir := ih.rules[0]
|
||||
now := time.Now()
|
||||
// Active alert that matches the source filter
|
||||
sourceAlert := types.Alert{
|
||||
sourceAlert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"s": "1", "e": "1"},
|
||||
StartsAt: now.Add(-time.Minute),
|
||||
EndsAt: now.Add(time.Hour),
|
||||
},
|
||||
}
|
||||
ir.scache = map[model.Fingerprint]*types.Alert{1: &sourceAlert}
|
||||
|
||||
ir.scache = store.NewAlerts(5 * time.Minute)
|
||||
ir.scache.Set(sourceAlert)
|
||||
|
||||
cases := []struct {
|
||||
target model.LabelSet
|
||||
@@ -202,40 +199,6 @@ func TestInhibitRuleMatches(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestInhibitRuleGC(t *testing.T) {
|
||||
// TODO(fabxc): add now() injection function to Resolved() to remove
|
||||
// dependency on machine time in this test.
|
||||
now := time.Now()
|
||||
newAlert := func(start, end time.Duration) *types.Alert {
|
||||
return &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{"a": "b"},
|
||||
StartsAt: now.Add(start * time.Minute),
|
||||
EndsAt: now.Add(end * time.Minute),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
before := map[model.Fingerprint]*types.Alert{
|
||||
0: newAlert(-10, -5),
|
||||
1: newAlert(10, 20),
|
||||
2: newAlert(-10, 10),
|
||||
3: newAlert(-10, -1),
|
||||
}
|
||||
after := map[model.Fingerprint]*types.Alert{
|
||||
1: newAlert(10, 20),
|
||||
2: newAlert(-10, 10),
|
||||
}
|
||||
|
||||
r := &InhibitRule{scache: before}
|
||||
r.gc()
|
||||
|
||||
if !reflect.DeepEqual(r.scache, after) {
|
||||
t.Errorf("Unexpected cache state after GC")
|
||||
t.Errorf(pretty.Compare(r.scache, after))
|
||||
}
|
||||
}
|
||||
|
||||
type fakeAlerts struct {
|
||||
alerts []*types.Alert
|
||||
finished chan struct{}
|
||||
|
||||
@@ -14,25 +14,31 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
)
|
||||
|
||||
const alertChannelLength = 200
|
||||
|
||||
// Alerts gives access to a set of alerts. All methods are goroutine-safe.
|
||||
type Alerts struct {
|
||||
mtx sync.RWMutex
|
||||
alerts map[model.Fingerprint]*types.Alert
|
||||
marker types.Marker
|
||||
intervalGC time.Duration
|
||||
stopGC chan struct{}
|
||||
listeners map[int]listeningAlerts
|
||||
next int
|
||||
alerts *store.Alerts
|
||||
cancel context.CancelFunc
|
||||
|
||||
mtx sync.Mutex
|
||||
listeners map[int]listeningAlerts
|
||||
next int
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type listeningAlerts struct {
|
||||
@@ -41,40 +47,24 @@ type listeningAlerts struct {
|
||||
}
|
||||
|
||||
// NewAlerts returns a new alert provider.
|
||||
func NewAlerts(m types.Marker, intervalGC time.Duration) (*Alerts, error) {
|
||||
func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
a := &Alerts{
|
||||
alerts: map[model.Fingerprint]*types.Alert{},
|
||||
marker: m,
|
||||
intervalGC: intervalGC,
|
||||
stopGC: make(chan struct{}),
|
||||
listeners: map[int]listeningAlerts{},
|
||||
next: 0,
|
||||
alerts: store.NewAlerts(intervalGC),
|
||||
cancel: cancel,
|
||||
listeners: map[int]listeningAlerts{},
|
||||
next: 0,
|
||||
logger: log.With(l, "component", "provider"),
|
||||
}
|
||||
go a.runGC()
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (a *Alerts) runGC() {
|
||||
for {
|
||||
select {
|
||||
case <-a.stopGC:
|
||||
return
|
||||
case <-time.After(a.intervalGC):
|
||||
}
|
||||
|
||||
a.mtx.Lock()
|
||||
|
||||
for fp, alert := range a.alerts {
|
||||
a.alerts.SetGCCallback(func(alerts []*types.Alert) {
|
||||
for _, alert := range alerts {
|
||||
// As we don't persist alerts, we no longer consider them after
|
||||
// they are resolved. Alerts waiting for resolved notifications are
|
||||
// held in memory in aggregation groups redundantly.
|
||||
if alert.EndsAt.Before(time.Now()) {
|
||||
delete(a.alerts, fp)
|
||||
a.marker.Delete(fp)
|
||||
}
|
||||
m.Delete(alert.Fingerprint())
|
||||
}
|
||||
|
||||
a.mtx.Lock()
|
||||
for i, l := range a.listeners {
|
||||
select {
|
||||
case <-l.done:
|
||||
@@ -84,14 +74,18 @@ func (a *Alerts) runGC() {
|
||||
// listener is not closed yet, hence proceed.
|
||||
}
|
||||
}
|
||||
|
||||
a.mtx.Unlock()
|
||||
}
|
||||
})
|
||||
a.alerts.Run(ctx)
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// Close the alert provider.
|
||||
func (a *Alerts) Close() {
|
||||
close(a.stopGC)
|
||||
if a.cancel != nil {
|
||||
a.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func max(a, b int) int {
|
||||
@@ -105,14 +99,12 @@ func max(a, b int) int {
|
||||
// resolved and successfully notified about.
|
||||
// They are not guaranteed to be in chronological order.
|
||||
func (a *Alerts) Subscribe() provider.AlertIterator {
|
||||
alerts, err := a.getPending()
|
||||
|
||||
var (
|
||||
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
|
||||
ch = make(chan *types.Alert, max(a.alerts.Count(), alertChannelLength))
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
for _, a := range alerts {
|
||||
for a := range a.alerts.List() {
|
||||
ch <- a
|
||||
}
|
||||
|
||||
@@ -122,7 +114,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
|
||||
a.listeners[i] = listeningAlerts{alerts: ch, done: done}
|
||||
a.mtx.Unlock()
|
||||
|
||||
return provider.NewAlertIterator(ch, done, err)
|
||||
return provider.NewAlertIterator(ch, done, nil)
|
||||
}
|
||||
|
||||
// GetPending returns an iterator over all alerts that have
|
||||
@@ -133,12 +125,10 @@ func (a *Alerts) GetPending() provider.AlertIterator {
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
alerts, err := a.getPending()
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
|
||||
for _, a := range alerts {
|
||||
for a := range a.alerts.List() {
|
||||
select {
|
||||
case ch <- a:
|
||||
case <-done:
|
||||
@@ -147,43 +137,23 @@ func (a *Alerts) GetPending() provider.AlertIterator {
|
||||
}
|
||||
}()
|
||||
|
||||
return provider.NewAlertIterator(ch, done, err)
|
||||
}
|
||||
|
||||
func (a *Alerts) getPending() ([]*types.Alert, error) {
|
||||
a.mtx.RLock()
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
res := make([]*types.Alert, 0, len(a.alerts))
|
||||
|
||||
for _, alert := range a.alerts {
|
||||
res = append(res, alert)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
return provider.NewAlertIterator(ch, done, nil)
|
||||
}
|
||||
|
||||
// Get returns the alert for a given fingerprint.
|
||||
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
|
||||
a.mtx.RLock()
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
alert, ok := a.alerts[fp]
|
||||
if !ok {
|
||||
return nil, provider.ErrNotFound
|
||||
}
|
||||
return alert, nil
|
||||
return a.alerts.Get(fp)
|
||||
}
|
||||
|
||||
// Put adds the given alert to the set.
|
||||
func (a *Alerts) Put(alerts ...*types.Alert) error {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
for _, alert := range alerts {
|
||||
fp := alert.Fingerprint()
|
||||
|
||||
if old, ok := a.alerts[fp]; ok {
|
||||
// Check that there's an alert existing within the store before
|
||||
// trying to merge.
|
||||
if old, err := a.alerts.Get(fp); err == nil {
|
||||
// Merge alerts if there is an overlap in activity range.
|
||||
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
|
||||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
|
||||
@@ -191,14 +161,19 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {
|
||||
}
|
||||
}
|
||||
|
||||
a.alerts[fp] = alert
|
||||
if err := a.alerts.Set(alert); err != nil {
|
||||
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
a.mtx.Lock()
|
||||
for _, l := range a.listeners {
|
||||
select {
|
||||
case l.alerts <- alert:
|
||||
case <-l.done:
|
||||
}
|
||||
}
|
||||
a.mtx.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
package mem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
@@ -22,10 +23,12 @@ import (
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/kylelemons/godebug/pretty"
|
||||
"github.com/prometheus/alertmanager/provider"
|
||||
"github.com/prometheus/alertmanager/store"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -81,7 +84,7 @@ func init() {
|
||||
// a listener can not unsubscribe as the lock is hold by `alerts.Lock`.
|
||||
func TestAlertsSubscribePutStarvation(t *testing.T) {
|
||||
marker := types.NewMarker()
|
||||
alerts, err := NewAlerts(marker, 30*time.Minute)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -132,7 +135,7 @@ func TestAlertsSubscribePutStarvation(t *testing.T) {
|
||||
|
||||
func TestAlertsPut(t *testing.T) {
|
||||
marker := types.NewMarker()
|
||||
alerts, err := NewAlerts(marker, 30*time.Minute)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -157,12 +160,12 @@ func TestAlertsPut(t *testing.T) {
|
||||
|
||||
func TestAlertsSubscribe(t *testing.T) {
|
||||
marker := types.NewMarker()
|
||||
alerts, err := NewAlerts(marker, 30*time.Minute)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// add alert1 to validate if pending alerts will be send
|
||||
// add alert1 to validate if pending alerts will be sent
|
||||
if err := alerts.Put(alert1); err != nil {
|
||||
t.Fatalf("Insert failed: %s", err)
|
||||
}
|
||||
@@ -246,7 +249,7 @@ func TestAlertsSubscribe(t *testing.T) {
|
||||
|
||||
func TestAlertsGetPending(t *testing.T) {
|
||||
marker := types.NewMarker()
|
||||
alerts, err := NewAlerts(marker, 30*time.Minute)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -289,7 +292,7 @@ func TestAlertsGetPending(t *testing.T) {
|
||||
|
||||
func TestAlertsGC(t *testing.T) {
|
||||
marker := types.NewMarker()
|
||||
alerts, err := NewAlerts(marker, 200*time.Millisecond)
|
||||
alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, log.NewNopLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -311,9 +314,8 @@ func TestAlertsGC(t *testing.T) {
|
||||
|
||||
for i, a := range insert {
|
||||
_, err := alerts.Get(a.Fingerprint())
|
||||
if err != provider.ErrNotFound {
|
||||
t.Errorf("alert %d didn't get GC'd", i)
|
||||
}
|
||||
require.Error(t, err)
|
||||
require.Equal(t, store.ErrNotFound, err, fmt.Sprintf("alert %d didn't get GC'd: %v", i, err))
|
||||
|
||||
s := marker.Status(a.Fingerprint())
|
||||
if s.State != types.AlertStateUnprocessed {
|
||||
@@ -323,6 +325,9 @@ func TestAlertsGC(t *testing.T) {
|
||||
}
|
||||
|
||||
func alertsEqual(a1, a2 *types.Alert) bool {
|
||||
if a1 == nil || a2 == nil {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a1.Labels, a2.Labels) {
|
||||
return false
|
||||
}
|
||||
|
||||
132
store/store.go
Normal file
132
store/store.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotFound is returned if a Store cannot find the Alert.
|
||||
ErrNotFound = errors.New("alert not found")
|
||||
)
|
||||
|
||||
// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by
|
||||
// their fingerprint. Resolved alerts are removed from the map based on
|
||||
// gcInterval. An optional callback can be set which receives a slice of all
|
||||
// resolved alerts that have been removed.
|
||||
type Alerts struct {
|
||||
gcInterval time.Duration
|
||||
|
||||
sync.Mutex
|
||||
c map[model.Fingerprint]*types.Alert
|
||||
cb func([]*types.Alert)
|
||||
}
|
||||
|
||||
// NewAlerts returns a new Alerts struct.
|
||||
func NewAlerts(gcInterval time.Duration) *Alerts {
|
||||
a := &Alerts{
|
||||
c: make(map[model.Fingerprint]*types.Alert),
|
||||
cb: func(_ []*types.Alert) {},
|
||||
gcInterval: gcInterval,
|
||||
}
|
||||
|
||||
if gcInterval == 0 {
|
||||
gcInterval = time.Minute
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// SetGCCallback sets a GC callback to be executed after each GC.
|
||||
func (a *Alerts) SetGCCallback(cb func([]*types.Alert)) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.cb = cb
|
||||
}
|
||||
|
||||
// Run starts the GC loop.
|
||||
func (a *Alerts) Run(ctx context.Context) {
|
||||
go func(t *time.Ticker) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
a.gc()
|
||||
}
|
||||
}
|
||||
}(time.NewTicker(a.gcInterval))
|
||||
}
|
||||
|
||||
func (a *Alerts) gc() {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
resolved := []*types.Alert{}
|
||||
for fp, alert := range a.c {
|
||||
if alert.Resolved() {
|
||||
delete(a.c, fp)
|
||||
resolved = append(resolved, alert)
|
||||
}
|
||||
}
|
||||
a.cb(resolved)
|
||||
}
|
||||
|
||||
// Get returns the Alert with the matching fingerprint, or an error if it is
|
||||
// not found.
|
||||
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
alert, prs := a.c[fp]
|
||||
if !prs {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return alert, nil
|
||||
}
|
||||
|
||||
// Set unconditionally sets the alert in memory.
|
||||
func (a *Alerts) Set(alert *types.Alert) error {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
a.c[alert.Fingerprint()] = alert
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the Alert with the matching fingerprint from the store.
|
||||
func (a *Alerts) Delete(fp model.Fingerprint) error {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
delete(a.c, fp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns a buffered channel of Alerts currently held in memory.
|
||||
func (a *Alerts) List() <-chan *types.Alert {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
c := make(chan *types.Alert, len(a.c))
|
||||
for _, alert := range a.c {
|
||||
c <- alert
|
||||
}
|
||||
close(c)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Count returns the number of items within the store.
|
||||
func (a *Alerts) Count() int {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
||||
return len(a.c)
|
||||
}
|
||||
96
store/store_test.go
Normal file
96
store/store_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSetGet(t *testing.T) {
|
||||
d := time.Minute
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := NewAlerts(d)
|
||||
a.Run(ctx)
|
||||
alert := &types.Alert{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
require.NoError(t, a.Set(alert))
|
||||
want := alert.Fingerprint()
|
||||
got, err := a.Get(want)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, want, got.Fingerprint())
|
||||
}
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
d := time.Minute
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := NewAlerts(d)
|
||||
a.Run(ctx)
|
||||
alert := &types.Alert{
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
require.NoError(t, a.Set(alert))
|
||||
|
||||
fp := alert.Fingerprint()
|
||||
|
||||
err := a.Delete(fp)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := a.Get(fp)
|
||||
require.Nil(t, got)
|
||||
require.Equal(t, ErrNotFound, err)
|
||||
}
|
||||
|
||||
func TestGC(t *testing.T) {
|
||||
now := time.Now()
|
||||
newAlert := func(key string, start, end time.Duration) *types.Alert {
|
||||
return &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{model.LabelName(key): "b"},
|
||||
StartsAt: now.Add(start * time.Minute),
|
||||
EndsAt: now.Add(end * time.Minute),
|
||||
},
|
||||
}
|
||||
}
|
||||
active := []*types.Alert{
|
||||
newAlert("b", 10, 20),
|
||||
newAlert("c", -10, 10),
|
||||
}
|
||||
resolved := []*types.Alert{
|
||||
newAlert("a", -10, -5),
|
||||
newAlert("d", -10, -1),
|
||||
}
|
||||
s := NewAlerts(5 * time.Minute)
|
||||
var n int
|
||||
s.SetGCCallback(func(a []*types.Alert) {
|
||||
for range a {
|
||||
n++
|
||||
}
|
||||
})
|
||||
for _, alert := range append(active, resolved...) {
|
||||
require.NoError(t, s.Set(alert))
|
||||
}
|
||||
|
||||
s.gc()
|
||||
|
||||
for _, alert := range active {
|
||||
if _, err := s.Get(alert.Fingerprint()); err != nil {
|
||||
t.Errorf("alert %v should not have been gc'd", alert)
|
||||
}
|
||||
}
|
||||
for _, alert := range resolved {
|
||||
if _, err := s.Get(alert.Fingerprint()); err == nil {
|
||||
t.Errorf("alert %v should have been gc'd", alert)
|
||||
}
|
||||
}
|
||||
require.Equal(t, len(resolved), n)
|
||||
}
|
||||
Reference in New Issue
Block a user