From e883ccb9dea4c8aec83fbf0f05ee2106bfbc674e Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Mon, 3 Sep 2018 14:52:53 +0200 Subject: [PATCH] pull out shared code for storing alerts (#1507) Move the code for storing and GC'ing alerts from being re-implemented in several packages to existing in its own package Signed-off-by: stuart nelson --- cmd/alertmanager/main.go | 2 +- dispatch/dispatch.go | 50 ++++++++------- inhibit/inhibit.go | 65 +++++-------------- inhibit/inhibit_test.go | 53 +++------------- provider/mem/mem.go | 121 ++++++++++++++--------------------- provider/mem/mem_test.go | 25 +++++--- store/store.go | 132 +++++++++++++++++++++++++++++++++++++++ store/store_test.go | 96 ++++++++++++++++++++++++++++ 8 files changed, 341 insertions(+), 203 deletions(-) create mode 100644 store/store.go create mode 100644 store/store_test.go diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 31e2c57c4..0456b6278 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -273,7 +273,7 @@ func main() { go peer.Settle(ctx, *gossipInterval*10) } - alerts, err := mem.NewAlerts(marker, *alertGCInterval) + alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger) if err != nil { level.Error(logger).Log("err", err) os.Exit(1) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d4172963e..a8397abe2 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" ) @@ -197,6 +198,7 @@ type aggrGroup struct { logger log.Logger routeKey string + alerts *store.Alerts ctx context.Context cancel func() done chan struct{} @@ -204,7 +206,6 @@ type aggrGroup struct { timeout func(time.Duration) time.Duration mtx sync.RWMutex - alerts map[model.Fingerprint]*types.Alert hasFlushed bool } @@ -218,9 +219,10 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func( routeKey: r.Key(), opts: &r.RouteOpts, timeout: to, - alerts: map[model.Fingerprint]*types.Alert{}, + alerts: store.NewAlerts(15 * time.Minute), } ag.ctx, ag.cancel = context.WithCancel(ctx) + ag.alerts.Run(ag.ctx) ag.logger = log.With(logger, "aggrGroup", ag) @@ -295,23 +297,21 @@ func (ag *aggrGroup) stop() { // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { - ag.mtx.Lock() - defer ag.mtx.Unlock() - - ag.alerts[alert.Fingerprint()] = alert + if err := ag.alerts.Set(alert); err != nil { + level.Error(ag.logger).Log("msg", "error on set alert", "err", err) + } // Immediately trigger a flush if the wait duration for this // alert is already over. + ag.mtx.Lock() + defer ag.mtx.Unlock() if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { ag.next.Reset(0) } } func (ag *aggrGroup) empty() bool { - ag.mtx.RLock() - defer ag.mtx.RUnlock() - - return len(ag.alerts) == 0 + return ag.alerts.Count() == 0 } // flush sends notifications for all new alerts. @@ -319,31 +319,35 @@ func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) { if ag.empty() { return } - ag.mtx.Lock() var ( - alerts = make(map[model.Fingerprint]*types.Alert, len(ag.alerts)) - alertsSlice = make(types.AlertSlice, 0, len(ag.alerts)) + alerts = ag.alerts.List() + alertsSlice = make(types.AlertSlice, 0, ag.alerts.Count()) ) - for fp, alert := range ag.alerts { - alerts[fp] = alert + for alert := range alerts { alertsSlice = append(alertsSlice, alert) } sort.Stable(alertsSlice) - ag.mtx.Unlock() - - level.Debug(ag.logger).Log("msg", "Flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) + level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice)) if notify(alertsSlice...) { - ag.mtx.Lock() - for fp, a := range alerts { + for _, a := range alertsSlice { // Only delete if the fingerprint has not been inserted // again since we notified about it. - if a.Resolved() && ag.alerts[fp] == a { - delete(ag.alerts, fp) + fp := a.Fingerprint() + got, err := ag.alerts.Get(fp) + if err != nil { + // This should only happen if the Alert was + // deleted from the store during the flush. + level.Error(ag.logger).Log("msg", "failed to get alert", "err", err) + continue + } + if a.Resolved() && got == a { + if err := ag.alerts.Delete(fp); err != nil { + level.Error(ag.logger).Log("msg", "error on delete alert", "err", err) + } } } - ag.mtx.Unlock() } } diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index be6de7849..f5a5f593f 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" ) @@ -54,19 +55,6 @@ func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, return ih } -func (ih *Inhibitor) runGC(ctx context.Context) { - for { - select { - case <-time.After(15 * time.Minute): - for _, r := range ih.rules { - r.gc() - } - case <-ctx.Done(): - return - } - } -} - func (ih *Inhibitor) run(ctx context.Context) { it := ih.alerts.Subscribe() defer it.Close() @@ -83,14 +71,16 @@ func (ih *Inhibitor) run(ctx context.Context) { // Update the inhibition rules' cache. for _, r := range ih.rules { if r.SourceMatchers.Match(a.Labels) { - r.set(a) + if err := r.scache.Set(a); err != nil { + level.Error(ih.logger).Log("msg", "error on set alert", "err", err) + } } } } } } -// Run the Inihibitor's background processing. +// Run the Inhibitor's background processing. func (ih *Inhibitor) Run() { var ( g group.Group @@ -100,15 +90,12 @@ func (ih *Inhibitor) Run() { ih.mtx.Lock() ctx, ih.cancel = context.WithCancel(context.Background()) ih.mtx.Unlock() - gcCtx, gcCancel := context.WithCancel(ctx) runCtx, runCancel := context.WithCancel(ctx) - g.Add(func() error { - ih.runGC(gcCtx) - return nil - }, func(err error) { - gcCancel() - }) + for _, rule := range ih.rules { + rule.scache.Run(runCtx) + } + g.Add(func() error { ih.run(runCtx) return nil @@ -166,12 +153,11 @@ type InhibitRule struct { // target alerts in order for the inhibition to take effect. Equal map[model.LabelName]struct{} - mtx sync.RWMutex // Cache of alerts matching source labels. - scache map[model.Fingerprint]*types.Alert + scache *store.Alerts } -// NewInhibitRule returns a new InihibtRule based on a configuration definition. +// NewInhibitRule returns a new InhibitRule based on a configuration definition. func NewInhibitRule(cr *config.InhibitRule) *InhibitRule { var ( sourcem types.Matchers @@ -201,26 +187,15 @@ func NewInhibitRule(cr *config.InhibitRule) *InhibitRule { SourceMatchers: sourcem, TargetMatchers: targetm, Equal: equal, - scache: map[model.Fingerprint]*types.Alert{}, + scache: store.NewAlerts(15 * time.Minute), } } -// set the alert in the source cache. -func (r *InhibitRule) set(a *types.Alert) { - r.mtx.Lock() - defer r.mtx.Unlock() - - r.scache[a.Fingerprint()] = a -} - // hasEqual checks whether the source cache contains alerts matching // the equal labels for the given label set. func (r *InhibitRule) hasEqual(lset model.LabelSet) (model.Fingerprint, bool) { - r.mtx.RLock() - defer r.mtx.RUnlock() - Outer: - for fp, a := range r.scache { + for a := range r.scache.List() { // The cache might be stale and contain resolved alerts. if a.Resolved() { continue @@ -230,19 +205,7 @@ Outer: continue Outer } } - return fp, true + return a.Fingerprint(), true } return model.Fingerprint(0), false } - -// gc clears out resolved alerts from the source cache. -func (r *InhibitRule) gc() { - r.mtx.Lock() - defer r.mtx.Unlock() - - for fp, a := range r.scache { - if a.Resolved() { - delete(r.scache, fp) - } - } -} diff --git a/inhibit/inhibit_test.go b/inhibit/inhibit_test.go index d269c06f2..d5863c4fd 100644 --- a/inhibit/inhibit_test.go +++ b/inhibit/inhibit_test.go @@ -14,16 +14,15 @@ package inhibit import ( - "reflect" "testing" "time" "github.com/go-kit/kit/log" - "github.com/kylelemons/godebug/pretty" "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" ) @@ -122,22 +121,18 @@ func TestInhibitRuleHasEqual(t *testing.T) { for _, c := range cases { r := &InhibitRule{ Equal: map[model.LabelName]struct{}{}, - scache: map[model.Fingerprint]*types.Alert{}, + scache: store.NewAlerts(5 * time.Minute), } for _, ln := range c.equal { r.Equal[ln] = struct{}{} } - for k, v := range c.initial { - r.scache[k] = v + for _, v := range c.initial { + r.scache.Set(v) } if _, have := r.hasEqual(c.input); have != c.result { t.Errorf("Unexpected result %t, expected %t", have, c.result) } - if !reflect.DeepEqual(r.scache, c.initial) { - t.Errorf("Cache state unexpectedly changed") - t.Errorf(pretty.Compare(r.scache, c.initial)) - } } } @@ -155,14 +150,16 @@ func TestInhibitRuleMatches(t *testing.T) { ir := ih.rules[0] now := time.Now() // Active alert that matches the source filter - sourceAlert := types.Alert{ + sourceAlert := &types.Alert{ Alert: model.Alert{ Labels: model.LabelSet{"s": "1", "e": "1"}, StartsAt: now.Add(-time.Minute), EndsAt: now.Add(time.Hour), }, } - ir.scache = map[model.Fingerprint]*types.Alert{1: &sourceAlert} + + ir.scache = store.NewAlerts(5 * time.Minute) + ir.scache.Set(sourceAlert) cases := []struct { target model.LabelSet @@ -202,40 +199,6 @@ func TestInhibitRuleMatches(t *testing.T) { } } -func TestInhibitRuleGC(t *testing.T) { - // TODO(fabxc): add now() injection function to Resolved() to remove - // dependency on machine time in this test. - now := time.Now() - newAlert := func(start, end time.Duration) *types.Alert { - return &types.Alert{ - Alert: model.Alert{ - Labels: model.LabelSet{"a": "b"}, - StartsAt: now.Add(start * time.Minute), - EndsAt: now.Add(end * time.Minute), - }, - } - } - - before := map[model.Fingerprint]*types.Alert{ - 0: newAlert(-10, -5), - 1: newAlert(10, 20), - 2: newAlert(-10, 10), - 3: newAlert(-10, -1), - } - after := map[model.Fingerprint]*types.Alert{ - 1: newAlert(10, 20), - 2: newAlert(-10, 10), - } - - r := &InhibitRule{scache: before} - r.gc() - - if !reflect.DeepEqual(r.scache, after) { - t.Errorf("Unexpected cache state after GC") - t.Errorf(pretty.Compare(r.scache, after)) - } -} - type fakeAlerts struct { alerts []*types.Alert finished chan struct{} diff --git a/provider/mem/mem.go b/provider/mem/mem.go index c55244fdd..36665595a 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -14,25 +14,31 @@ package mem import ( + "context" "sync" "time" - "github.com/prometheus/alertmanager/provider" - "github.com/prometheus/alertmanager/types" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/store" + "github.com/prometheus/alertmanager/types" ) const alertChannelLength = 200 // Alerts gives access to a set of alerts. All methods are goroutine-safe. type Alerts struct { - mtx sync.RWMutex - alerts map[model.Fingerprint]*types.Alert - marker types.Marker - intervalGC time.Duration - stopGC chan struct{} - listeners map[int]listeningAlerts - next int + alerts *store.Alerts + cancel context.CancelFunc + + mtx sync.Mutex + listeners map[int]listeningAlerts + next int + + logger log.Logger } type listeningAlerts struct { @@ -41,40 +47,24 @@ type listeningAlerts struct { } // NewAlerts returns a new alert provider. -func NewAlerts(m types.Marker, intervalGC time.Duration) (*Alerts, error) { +func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, l log.Logger) (*Alerts, error) { + ctx, cancel := context.WithCancel(ctx) a := &Alerts{ - alerts: map[model.Fingerprint]*types.Alert{}, - marker: m, - intervalGC: intervalGC, - stopGC: make(chan struct{}), - listeners: map[int]listeningAlerts{}, - next: 0, + alerts: store.NewAlerts(intervalGC), + cancel: cancel, + listeners: map[int]listeningAlerts{}, + next: 0, + logger: log.With(l, "component", "provider"), } - go a.runGC() - - return a, nil -} - -func (a *Alerts) runGC() { - for { - select { - case <-a.stopGC: - return - case <-time.After(a.intervalGC): - } - - a.mtx.Lock() - - for fp, alert := range a.alerts { + a.alerts.SetGCCallback(func(alerts []*types.Alert) { + for _, alert := range alerts { // As we don't persist alerts, we no longer consider them after // they are resolved. Alerts waiting for resolved notifications are // held in memory in aggregation groups redundantly. - if alert.EndsAt.Before(time.Now()) { - delete(a.alerts, fp) - a.marker.Delete(fp) - } + m.Delete(alert.Fingerprint()) } + a.mtx.Lock() for i, l := range a.listeners { select { case <-l.done: @@ -84,14 +74,18 @@ func (a *Alerts) runGC() { // listener is not closed yet, hence proceed. } } - a.mtx.Unlock() - } + }) + a.alerts.Run(ctx) + + return a, nil } // Close the alert provider. func (a *Alerts) Close() { - close(a.stopGC) + if a.cancel != nil { + a.cancel() + } } func max(a, b int) int { @@ -105,14 +99,12 @@ func max(a, b int) int { // resolved and successfully notified about. // They are not guaranteed to be in chronological order. func (a *Alerts) Subscribe() provider.AlertIterator { - alerts, err := a.getPending() - var ( - ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) + ch = make(chan *types.Alert, max(a.alerts.Count(), alertChannelLength)) done = make(chan struct{}) ) - for _, a := range alerts { + for a := range a.alerts.List() { ch <- a } @@ -122,7 +114,7 @@ func (a *Alerts) Subscribe() provider.AlertIterator { a.listeners[i] = listeningAlerts{alerts: ch, done: done} a.mtx.Unlock() - return provider.NewAlertIterator(ch, done, err) + return provider.NewAlertIterator(ch, done, nil) } // GetPending returns an iterator over all alerts that have @@ -133,12 +125,10 @@ func (a *Alerts) GetPending() provider.AlertIterator { done = make(chan struct{}) ) - alerts, err := a.getPending() - go func() { defer close(ch) - for _, a := range alerts { + for a := range a.alerts.List() { select { case ch <- a: case <-done: @@ -147,43 +137,23 @@ func (a *Alerts) GetPending() provider.AlertIterator { } }() - return provider.NewAlertIterator(ch, done, err) -} - -func (a *Alerts) getPending() ([]*types.Alert, error) { - a.mtx.RLock() - defer a.mtx.RUnlock() - - res := make([]*types.Alert, 0, len(a.alerts)) - - for _, alert := range a.alerts { - res = append(res, alert) - } - - return res, nil + return provider.NewAlertIterator(ch, done, nil) } // Get returns the alert for a given fingerprint. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { - a.mtx.RLock() - defer a.mtx.RUnlock() - - alert, ok := a.alerts[fp] - if !ok { - return nil, provider.ErrNotFound - } - return alert, nil + return a.alerts.Get(fp) } // Put adds the given alert to the set. func (a *Alerts) Put(alerts ...*types.Alert) error { - a.mtx.Lock() - defer a.mtx.Unlock() for _, alert := range alerts { fp := alert.Fingerprint() - if old, ok := a.alerts[fp]; ok { + // Check that there's an alert existing within the store before + // trying to merge. + if old, err := a.alerts.Get(fp); err == nil { // Merge alerts if there is an overlap in activity range. if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { @@ -191,14 +161,19 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { } } - a.alerts[fp] = alert + if err := a.alerts.Set(alert); err != nil { + level.Error(a.logger).Log("msg", "error on set alert", "err", err) + continue + } + a.mtx.Lock() for _, l := range a.listeners { select { case l.alerts <- alert: case <-l.done: } } + a.mtx.Unlock() } return nil diff --git a/provider/mem/mem_test.go b/provider/mem/mem_test.go index 4a5d36b33..2733778a9 100644 --- a/provider/mem/mem_test.go +++ b/provider/mem/mem_test.go @@ -14,6 +14,7 @@ package mem import ( + "context" "fmt" "reflect" "strconv" @@ -22,10 +23,12 @@ import ( "sync" + "github.com/go-kit/kit/log" "github.com/kylelemons/godebug/pretty" - "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/types" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" ) var ( @@ -81,7 +84,7 @@ func init() { // a listener can not unsubscribe as the lock is hold by `alerts.Lock`. func TestAlertsSubscribePutStarvation(t *testing.T) { marker := types.NewMarker() - alerts, err := NewAlerts(marker, 30*time.Minute) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -132,7 +135,7 @@ func TestAlertsSubscribePutStarvation(t *testing.T) { func TestAlertsPut(t *testing.T) { marker := types.NewMarker() - alerts, err := NewAlerts(marker, 30*time.Minute) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -157,12 +160,12 @@ func TestAlertsPut(t *testing.T) { func TestAlertsSubscribe(t *testing.T) { marker := types.NewMarker() - alerts, err := NewAlerts(marker, 30*time.Minute) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) if err != nil { t.Fatal(err) } - // add alert1 to validate if pending alerts will be send + // add alert1 to validate if pending alerts will be sent if err := alerts.Put(alert1); err != nil { t.Fatalf("Insert failed: %s", err) } @@ -246,7 +249,7 @@ func TestAlertsSubscribe(t *testing.T) { func TestAlertsGetPending(t *testing.T) { marker := types.NewMarker() - alerts, err := NewAlerts(marker, 30*time.Minute) + alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -289,7 +292,7 @@ func TestAlertsGetPending(t *testing.T) { func TestAlertsGC(t *testing.T) { marker := types.NewMarker() - alerts, err := NewAlerts(marker, 200*time.Millisecond) + alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, log.NewNopLogger()) if err != nil { t.Fatal(err) } @@ -311,9 +314,8 @@ func TestAlertsGC(t *testing.T) { for i, a := range insert { _, err := alerts.Get(a.Fingerprint()) - if err != provider.ErrNotFound { - t.Errorf("alert %d didn't get GC'd", i) - } + require.Error(t, err) + require.Equal(t, store.ErrNotFound, err, fmt.Sprintf("alert %d didn't get GC'd: %v", i, err)) s := marker.Status(a.Fingerprint()) if s.State != types.AlertStateUnprocessed { @@ -323,6 +325,9 @@ func TestAlertsGC(t *testing.T) { } func alertsEqual(a1, a2 *types.Alert) bool { + if a1 == nil || a2 == nil { + return false + } if !reflect.DeepEqual(a1.Labels, a2.Labels) { return false } diff --git a/store/store.go b/store/store.go new file mode 100644 index 000000000..3c733a805 --- /dev/null +++ b/store/store.go @@ -0,0 +1,132 @@ +package store + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/model" +) + +var ( + // ErrNotFound is returned if a Store cannot find the Alert. + ErrNotFound = errors.New("alert not found") +) + +// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by +// their fingerprint. Resolved alerts are removed from the map based on +// gcInterval. An optional callback can be set which receives a slice of all +// resolved alerts that have been removed. +type Alerts struct { + gcInterval time.Duration + + sync.Mutex + c map[model.Fingerprint]*types.Alert + cb func([]*types.Alert) +} + +// NewAlerts returns a new Alerts struct. +func NewAlerts(gcInterval time.Duration) *Alerts { + a := &Alerts{ + c: make(map[model.Fingerprint]*types.Alert), + cb: func(_ []*types.Alert) {}, + gcInterval: gcInterval, + } + + if gcInterval == 0 { + gcInterval = time.Minute + } + + return a +} + +// SetGCCallback sets a GC callback to be executed after each GC. +func (a *Alerts) SetGCCallback(cb func([]*types.Alert)) { + a.Lock() + defer a.Unlock() + + a.cb = cb +} + +// Run starts the GC loop. +func (a *Alerts) Run(ctx context.Context) { + go func(t *time.Ticker) { + for { + select { + case <-ctx.Done(): + return + case <-t.C: + a.gc() + } + } + }(time.NewTicker(a.gcInterval)) +} + +func (a *Alerts) gc() { + a.Lock() + defer a.Unlock() + + resolved := []*types.Alert{} + for fp, alert := range a.c { + if alert.Resolved() { + delete(a.c, fp) + resolved = append(resolved, alert) + } + } + a.cb(resolved) +} + +// Get returns the Alert with the matching fingerprint, or an error if it is +// not found. +func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { + a.Lock() + defer a.Unlock() + + alert, prs := a.c[fp] + if !prs { + return nil, ErrNotFound + } + return alert, nil +} + +// Set unconditionally sets the alert in memory. +func (a *Alerts) Set(alert *types.Alert) error { + a.Lock() + defer a.Unlock() + + a.c[alert.Fingerprint()] = alert + return nil +} + +// Delete removes the Alert with the matching fingerprint from the store. +func (a *Alerts) Delete(fp model.Fingerprint) error { + a.Lock() + defer a.Unlock() + + delete(a.c, fp) + return nil +} + +// List returns a buffered channel of Alerts currently held in memory. +func (a *Alerts) List() <-chan *types.Alert { + a.Lock() + defer a.Unlock() + + c := make(chan *types.Alert, len(a.c)) + for _, alert := range a.c { + c <- alert + } + close(c) + + return c +} + +// Count returns the number of items within the store. +func (a *Alerts) Count() int { + a.Lock() + defer a.Unlock() + + return len(a.c) +} diff --git a/store/store_test.go b/store/store_test.go new file mode 100644 index 000000000..6cef917df --- /dev/null +++ b/store/store_test.go @@ -0,0 +1,96 @@ +package store + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestSetGet(t *testing.T) { + d := time.Minute + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := NewAlerts(d) + a.Run(ctx) + alert := &types.Alert{ + UpdatedAt: time.Now(), + } + require.NoError(t, a.Set(alert)) + want := alert.Fingerprint() + got, err := a.Get(want) + + require.NoError(t, err) + require.Equal(t, want, got.Fingerprint()) +} + +func TestDelete(t *testing.T) { + d := time.Minute + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := NewAlerts(d) + a.Run(ctx) + alert := &types.Alert{ + UpdatedAt: time.Now(), + } + require.NoError(t, a.Set(alert)) + + fp := alert.Fingerprint() + + err := a.Delete(fp) + require.NoError(t, err) + + got, err := a.Get(fp) + require.Nil(t, got) + require.Equal(t, ErrNotFound, err) +} + +func TestGC(t *testing.T) { + now := time.Now() + newAlert := func(key string, start, end time.Duration) *types.Alert { + return &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{model.LabelName(key): "b"}, + StartsAt: now.Add(start * time.Minute), + EndsAt: now.Add(end * time.Minute), + }, + } + } + active := []*types.Alert{ + newAlert("b", 10, 20), + newAlert("c", -10, 10), + } + resolved := []*types.Alert{ + newAlert("a", -10, -5), + newAlert("d", -10, -1), + } + s := NewAlerts(5 * time.Minute) + var n int + s.SetGCCallback(func(a []*types.Alert) { + for range a { + n++ + } + }) + for _, alert := range append(active, resolved...) { + require.NoError(t, s.Set(alert)) + } + + s.gc() + + for _, alert := range active { + if _, err := s.Get(alert.Fingerprint()); err != nil { + t.Errorf("alert %v should not have been gc'd", alert) + } + } + for _, alert := range resolved { + if _, err := s.Get(alert.Fingerprint()); err == nil { + t.Errorf("alert %v should have been gc'd", alert) + } + } + require.Equal(t, len(resolved), n) +}