mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 15:45:34 +01:00
test/... remove duplicated code (#4752)
* Deduplicate code between test/cli and test/with_api_v2 Signed-off-by: Guido Trotter <guido@hudson-trading.com> * Consolidate MockWebhook Signed-off-by: Guido Trotter <guido@hudson-trading.com> * Consolidate acceptance types and start cluster functionality Signed-off-by: Guido Trotter <guido@hudson-trading.com> --------- Signed-off-by: Guido Trotter <guido@hudson-trading.com> Co-authored-by: Guido Trotter <guido@hudson-trading.com>
This commit is contained in:
@@ -14,31 +14,21 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
"github.com/go-openapi/strfmt"
|
||||
|
||||
apiclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
"github.com/prometheus/alertmanager/api/v2/client/general"
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/cli/format"
|
||||
"github.com/prometheus/alertmanager/test/testutils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -47,72 +37,24 @@ const (
|
||||
amtool = "../../../amtool"
|
||||
)
|
||||
|
||||
// AcceptanceTest provides declarative definition of given inputs and expected
|
||||
// output of an Alertmanager setup.
|
||||
// Re-export common types from testutils.
|
||||
type (
|
||||
Collector = testutils.Collector
|
||||
AcceptanceOpts = testutils.AcceptanceOpts
|
||||
)
|
||||
|
||||
var CompareCollectors = testutils.CompareCollectors
|
||||
|
||||
// AcceptanceTest wraps testutils.AcceptanceTest for CLI-based testing.
|
||||
type AcceptanceTest struct {
|
||||
*testing.T
|
||||
|
||||
opts *AcceptanceOpts
|
||||
|
||||
amc *AlertmanagerCluster
|
||||
collectors []*Collector
|
||||
|
||||
actions map[float64][]func()
|
||||
*testutils.AcceptanceTest
|
||||
}
|
||||
|
||||
// AcceptanceOpts defines configuration parameters for an acceptance test.
|
||||
type AcceptanceOpts struct {
|
||||
RoutePrefix string
|
||||
Tolerance time.Duration
|
||||
baseTime time.Time
|
||||
}
|
||||
|
||||
func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
|
||||
if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
|
||||
return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
|
||||
}
|
||||
return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
|
||||
}
|
||||
|
||||
// expandTime returns the absolute time for the relative time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
|
||||
return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
|
||||
}
|
||||
|
||||
// expandTime returns the relative time for the given time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
|
||||
return float64(act.Sub(opts.baseTime)) / float64(time.Second)
|
||||
}
|
||||
|
||||
// NewAcceptanceTest returns a new acceptance test with the base time
|
||||
// set to the current time.
|
||||
// NewAcceptanceTest returns a new acceptance test.
|
||||
func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
|
||||
test := &AcceptanceTest{
|
||||
T: t,
|
||||
opts: opts,
|
||||
actions: map[float64][]func(){},
|
||||
return &AcceptanceTest{
|
||||
AcceptanceTest: testutils.NewAcceptanceTest(t, opts),
|
||||
}
|
||||
|
||||
return test
|
||||
}
|
||||
|
||||
// freeAddress returns a new listen address not currently in use.
|
||||
func freeAddress() string {
|
||||
// Let the OS allocate a free address, close it and hope
|
||||
// it is still free when starting Alertmanager.
|
||||
l, err := net.Listen("tcp4", "localhost:0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := l.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return l.Addr().String()
|
||||
}
|
||||
|
||||
// AmtoolOk verifies that the "amtool" file exists in the correct location for testing,
|
||||
@@ -127,340 +69,37 @@ func AmtoolOk() (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Do sets the given function to be executed at the given time.
|
||||
func (t *AcceptanceTest) Do(at float64, f func()) {
|
||||
t.actions[at] = append(t.actions[at], f)
|
||||
}
|
||||
|
||||
// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
|
||||
// cluster of Alertmanager instances on random ports.
|
||||
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
|
||||
amc := AlertmanagerCluster{}
|
||||
|
||||
for range size {
|
||||
am := &Alertmanager{
|
||||
t: t,
|
||||
opts: t.opts,
|
||||
}
|
||||
|
||||
dir, err := os.MkdirTemp("", "am_test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.dir = dir
|
||||
|
||||
cf, err := os.Create(filepath.Join(dir, "config.yml"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.confFile = cf
|
||||
am.UpdateConfig(conf)
|
||||
|
||||
am.apiAddr = freeAddress()
|
||||
am.clusterAddr = freeAddress()
|
||||
|
||||
transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
|
||||
am.clientV2 = apiclient.New(transport, strfmt.Default)
|
||||
|
||||
amc.ams = append(amc.ams, am)
|
||||
}
|
||||
|
||||
t.amc = &amc
|
||||
|
||||
return &amc
|
||||
}
|
||||
|
||||
// Collector returns a new collector bound to the test instance.
|
||||
func (t *AcceptanceTest) Collector(name string) *Collector {
|
||||
co := &Collector{
|
||||
t: t.T,
|
||||
name: name,
|
||||
opts: t.opts,
|
||||
collected: map[float64][]models.GettableAlerts{},
|
||||
expected: map[Interval][]models.GettableAlerts{},
|
||||
}
|
||||
t.collectors = append(t.collectors, co)
|
||||
|
||||
return co
|
||||
}
|
||||
|
||||
// Run starts all Alertmanagers and runs queries against them. It then checks
|
||||
// whether all expected notifications have arrived at the expected receiver.
|
||||
func (t *AcceptanceTest) Run() {
|
||||
errc := make(chan error)
|
||||
|
||||
for _, am := range t.amc.ams {
|
||||
am.errc = errc
|
||||
defer func(am *Alertmanager) {
|
||||
am.Terminate()
|
||||
am.cleanup()
|
||||
t.Logf("stdout:\n%v", am.cmd.Stdout)
|
||||
t.Logf("stderr:\n%v", am.cmd.Stderr)
|
||||
}(am)
|
||||
}
|
||||
|
||||
err := t.amc.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Set the reference time right before running the test actions to avoid
|
||||
// test failures due to slow setup of the test environment.
|
||||
t.opts.baseTime = time.Now()
|
||||
|
||||
go t.runActions()
|
||||
|
||||
var latest float64
|
||||
for _, coll := range t.collectors {
|
||||
if l := coll.latest(); l > latest {
|
||||
latest = l
|
||||
}
|
||||
}
|
||||
|
||||
deadline := t.opts.expandTime(latest)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(deadline)):
|
||||
// continue
|
||||
case err := <-errc:
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// runActions performs the stored actions at the defined times.
|
||||
func (t *AcceptanceTest) runActions() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for at, fs := range t.actions {
|
||||
ts := t.opts.expandTime(at)
|
||||
wg.Add(len(fs))
|
||||
|
||||
for _, f := range fs {
|
||||
go func(f func()) {
|
||||
time.Sleep(time.Until(ts))
|
||||
f()
|
||||
wg.Done()
|
||||
}(f)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
b bytes.Buffer
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (b *buffer) Write(p []byte) (int, error) {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.Write(p)
|
||||
}
|
||||
|
||||
func (b *buffer) String() string {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.String()
|
||||
}
|
||||
|
||||
// Alertmanager encapsulates an Alertmanager process and allows
|
||||
// declaring alerts being pushed to it at fixed points in time.
|
||||
// Alertmanager wraps testutils.Alertmanager and adds CLI-specific methods.
|
||||
type Alertmanager struct {
|
||||
t *AcceptanceTest
|
||||
opts *AcceptanceOpts
|
||||
|
||||
apiAddr string
|
||||
clusterAddr string
|
||||
clientV2 *apiclient.AlertmanagerAPI
|
||||
cmd *exec.Cmd
|
||||
confFile *os.File
|
||||
dir string
|
||||
|
||||
errc chan<- error
|
||||
*testutils.Alertmanager
|
||||
}
|
||||
|
||||
// AlertmanagerCluster represents a group of Alertmanager instances
|
||||
// acting as a cluster.
|
||||
// AlertmanagerCluster wraps testutils.AlertmanagerCluster and adds CLI-specific methods.
|
||||
type AlertmanagerCluster struct {
|
||||
ams []*Alertmanager
|
||||
*testutils.AlertmanagerCluster
|
||||
}
|
||||
|
||||
// Start the Alertmanager cluster and wait until it is ready to receive.
|
||||
func (amc *AlertmanagerCluster) Start() error {
|
||||
var peerFlags []string
|
||||
for _, am := range amc.ams {
|
||||
peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
|
||||
// AlertmanagerCluster returns a new AlertmanagerCluster.
|
||||
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
|
||||
return &AlertmanagerCluster{
|
||||
AlertmanagerCluster: t.AcceptanceTest.AlertmanagerCluster(conf, size),
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.Start(peerFlags)
|
||||
if err != nil {
|
||||
return fmt.Errorf("starting alertmanager cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.WaitForCluster(len(amc.ams))
|
||||
if err != nil {
|
||||
return fmt.Errorf("waiting alertmanager cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Members returns the underlying slice of cluster members.
|
||||
// Members returns the underlying Alertmanager instances wrapped for CLI testing.
|
||||
func (amc *AlertmanagerCluster) Members() []*Alertmanager {
|
||||
return amc.ams
|
||||
baseMembers := amc.AlertmanagerCluster.Members()
|
||||
wrapped := make([]*Alertmanager, len(baseMembers))
|
||||
for i, am := range baseMembers {
|
||||
wrapped[i] = &Alertmanager{Alertmanager: am}
|
||||
}
|
||||
return wrapped
|
||||
}
|
||||
|
||||
// Start the alertmanager and wait until it is ready to receive.
|
||||
func (am *Alertmanager) Start(additionalArg []string) error {
|
||||
am.t.Helper()
|
||||
args := []string{
|
||||
"--config.file", am.confFile.Name(),
|
||||
"--log.level", "debug",
|
||||
"--web.listen-address", am.apiAddr,
|
||||
"--storage.path", am.dir,
|
||||
"--cluster.listen-address", am.clusterAddr,
|
||||
"--cluster.settle-timeout", "0s",
|
||||
}
|
||||
if am.opts.RoutePrefix != "" {
|
||||
args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
|
||||
}
|
||||
args = append(args, additionalArg...)
|
||||
|
||||
cmd := exec.Command("../../../alertmanager", args...)
|
||||
|
||||
if am.cmd == nil {
|
||||
var outb, errb buffer
|
||||
cmd.Stdout = &outb
|
||||
cmd.Stderr = &errb
|
||||
} else {
|
||||
cmd.Stdout = am.cmd.Stdout
|
||||
cmd.Stderr = am.cmd.Stderr
|
||||
}
|
||||
am.cmd = cmd
|
||||
|
||||
if err := am.cmd.Start(); err != nil {
|
||||
return fmt.Errorf("starting alertmanager failed: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := am.cmd.Wait(); err != nil {
|
||||
am.errc <- err
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
for range 10 {
|
||||
resp, err := http.Get(am.getURL("/"))
|
||||
if err != nil {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("starting alertmanager failed: expected HTTP status '200', got '%d'", resp.StatusCode)
|
||||
}
|
||||
_, err = io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("starting alertmanager failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("starting alertmanager failed: timeout")
|
||||
}
|
||||
|
||||
// WaitForCluster waits for the Alertmanager instance to join a cluster with the
|
||||
// given size.
|
||||
func (am *Alertmanager) WaitForCluster(size int) error {
|
||||
params := general.NewGetStatusParams()
|
||||
params.WithContext(context.Background())
|
||||
var status general.GetStatusOK
|
||||
|
||||
// Poll for 2s
|
||||
for range 20 {
|
||||
status, err := am.clientV2.General.GetStatus(params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(status.Payload.Cluster.Peers) == size {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"failed to wait for Alertmanager instance %q to join cluster: expected %v peers, but got %v",
|
||||
am.clusterAddr,
|
||||
size,
|
||||
len(status.Payload.Cluster.Peers),
|
||||
)
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager cluster processes and removes intermediate
|
||||
// data.
|
||||
func (amc *AlertmanagerCluster) Terminate() {
|
||||
for _, am := range amc.ams {
|
||||
am.Terminate()
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager process and remove intermediate
|
||||
// data.
|
||||
func (am *Alertmanager) Terminate() {
|
||||
am.t.Helper()
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
|
||||
am.t.Fatalf("Error sending SIGTERM to Alertmanager process: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager instances.
|
||||
func (amc *AlertmanagerCluster) Reload() {
|
||||
for _, am := range amc.ams {
|
||||
am.Reload()
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager process.
|
||||
func (am *Alertmanager) Reload() {
|
||||
am.t.Helper()
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
|
||||
am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (am *Alertmanager) cleanup() {
|
||||
am.t.Helper()
|
||||
if err := os.RemoveAll(am.confFile.Name()); err != nil {
|
||||
am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// Version runs the 'amtool' command with the --version option and checks
|
||||
// for appropriate output.
|
||||
func Version() (string, error) {
|
||||
cmd := exec.Command(amtool, "--version")
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
versionRE := regexp.MustCompile(`^amtool, version (\d+\.\d+\.\d+) *`)
|
||||
matched := versionRE.FindStringSubmatch(string(out))
|
||||
if len(matched) != 2 {
|
||||
return "", errors.New("Unable to match version info regex: " + string(out))
|
||||
}
|
||||
return matched[1], nil
|
||||
}
|
||||
|
||||
// AddAlertsAt declares alerts that are to be added to the Alertmanager
|
||||
// server at a relative point in time.
|
||||
// AddAlertsAt declares alerts that are to be added to the Alertmanager server
|
||||
// at a relative point in time.
|
||||
func (am *Alertmanager) AddAlertsAt(omitEquals bool, at float64, alerts ...*TestAlert) {
|
||||
am.t.Do(at, func() {
|
||||
am.T.Do(at, func() {
|
||||
am.AddAlerts(omitEquals, alerts...)
|
||||
})
|
||||
}
|
||||
@@ -476,7 +115,7 @@ func (am *Alertmanager) AddAlerts(omitEquals bool, alerts ...*TestAlert) {
|
||||
for _, alert := range alerts {
|
||||
out, err := am.addAlertCommand(omitEquals, alert)
|
||||
if err != nil {
|
||||
am.t.Errorf("Error adding alert: %v\nOutput: %s", err, string(out))
|
||||
am.T.Errorf("Error adding alert: %v\nOutput: %s", err, string(out))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -485,8 +124,8 @@ func (am *Alertmanager) addAlertCommand(omitEquals bool, alert *TestAlert) ([]by
|
||||
amURLFlag := "--alertmanager.url=" + am.getURL("/")
|
||||
args := []string{amURLFlag, "alert", "add"}
|
||||
// Make a copy of the labels
|
||||
labels := make(models.LabelSet, len(alert.labels))
|
||||
maps.Copy(labels, alert.labels)
|
||||
labels := make(models.LabelSet, len(alert.Labels))
|
||||
maps.Copy(labels, alert.Labels)
|
||||
if omitEquals {
|
||||
// If alertname is present and omitEquals is true then the command should
|
||||
// be `amtool alert add foo ...` and not `amtool alert add alertname=foo ...`.
|
||||
@@ -498,10 +137,10 @@ func (am *Alertmanager) addAlertCommand(omitEquals bool, alert *TestAlert) ([]by
|
||||
for k, v := range labels {
|
||||
args = append(args, k+"="+v)
|
||||
}
|
||||
startsAt := strfmt.DateTime(am.opts.expandTime(alert.startsAt))
|
||||
startsAt := strfmt.DateTime(am.Opts.ExpandTime(alert.StartsAt))
|
||||
args = append(args, "--start="+startsAt.String())
|
||||
if alert.endsAt > alert.startsAt {
|
||||
endsAt := strfmt.DateTime(am.opts.expandTime(alert.endsAt))
|
||||
if alert.EndsAt > alert.StartsAt {
|
||||
endsAt := strfmt.DateTime(am.Opts.ExpandTime(alert.EndsAt))
|
||||
args = append(args, "--end="+endsAt.String())
|
||||
}
|
||||
cmd := exec.Command(amtool, args...)
|
||||
@@ -541,9 +180,9 @@ func parseAlertQueryResponse(data []byte) ([]TestAlert, error) {
|
||||
}
|
||||
summary := strings.TrimSpace(line[summPos:])
|
||||
alert := TestAlert{
|
||||
labels: models.LabelSet{"alertname": alertName},
|
||||
startsAt: float64(startsAt.Unix()),
|
||||
summary: summary,
|
||||
Labels: models.LabelSet{"alertname": alertName},
|
||||
StartsAt: float64(startsAt.Unix()),
|
||||
Summary: summary,
|
||||
}
|
||||
alerts = append(alerts, alert)
|
||||
}
|
||||
@@ -552,7 +191,7 @@ func parseAlertQueryResponse(data []byte) ([]TestAlert, error) {
|
||||
|
||||
// SetSilence updates or creates the given Silence.
|
||||
func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
|
||||
for _, am := range amc.ams {
|
||||
for _, am := range amc.Members() {
|
||||
am.SetSilence(at, sil)
|
||||
}
|
||||
}
|
||||
@@ -561,7 +200,7 @@ func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
|
||||
func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
|
||||
out, err := am.addSilenceCommand(sil)
|
||||
if err != nil {
|
||||
am.t.Errorf("Unable to set silence %v %v", err, string(out))
|
||||
am.T.Errorf("Unable to set silence %v %v", err, string(out))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -584,7 +223,7 @@ func (am *Alertmanager) QuerySilence(match ...string) ([]TestSilence, error) {
|
||||
cmd := exec.Command(amtool, args...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
am.t.Error("Silence query command failed: ", err)
|
||||
am.T.Error("Silence query command failed: ", err)
|
||||
}
|
||||
return parseSilenceQueryResponse(out)
|
||||
}
|
||||
@@ -596,7 +235,7 @@ func (am *Alertmanager) QueryExpiredSilence(match ...string) ([]TestSilence, err
|
||||
cmd := exec.Command(amtool, args...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
am.t.Error("Silence query command failed: ", err)
|
||||
am.T.Error("Silence query command failed: ", err)
|
||||
}
|
||||
return parseSilenceQueryResponse(out)
|
||||
}
|
||||
@@ -647,7 +286,7 @@ func parseSilenceQueryResponse(data []byte) ([]TestSilence, error) {
|
||||
|
||||
// DelSilence deletes the silence with the sid at the given time.
|
||||
func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
|
||||
for _, am := range amc.ams {
|
||||
for _, am := range amc.Members() {
|
||||
am.DelSilence(at, sil)
|
||||
}
|
||||
}
|
||||
@@ -656,7 +295,7 @@ func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
|
||||
func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
|
||||
output, err := am.expireSilenceCommand(sil)
|
||||
if err != nil {
|
||||
am.t.Errorf("Error expiring silence %v: %s", string(output), err)
|
||||
am.T.Errorf("Error expiring silence %v: %s", string(output), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -693,27 +332,7 @@ func (am *Alertmanager) ExpireSilenceByID(id string) ([]byte, error) {
|
||||
return cmd.CombinedOutput()
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
|
||||
// does not initiate config reloading.
|
||||
func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
|
||||
for _, am := range amc.ams {
|
||||
am.UpdateConfig(conf)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
|
||||
// initiate config reloading.
|
||||
func (am *Alertmanager) UpdateConfig(conf string) {
|
||||
if _, err := am.confFile.WriteString(conf); err != nil {
|
||||
am.t.Fatal(err)
|
||||
return
|
||||
}
|
||||
if err := am.confFile.Sync(); err != nil {
|
||||
am.t.Fatal(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ShowRoute shows the routing tree using 'amtool config routes show'.
|
||||
func (am *Alertmanager) ShowRoute() ([]byte, error) {
|
||||
return am.showRouteCommand()
|
||||
}
|
||||
@@ -725,6 +344,7 @@ func (am *Alertmanager) showRouteCommand() ([]byte, error) {
|
||||
return cmd.CombinedOutput()
|
||||
}
|
||||
|
||||
// TestRoute tests label matching against the routing tree using 'amtool config routes test'.
|
||||
func (am *Alertmanager) TestRoute(labels ...string) ([]byte, error) {
|
||||
return am.testRouteCommand(labels...)
|
||||
}
|
||||
@@ -737,5 +357,22 @@ func (am *Alertmanager) testRouteCommand(labels ...string) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (am *Alertmanager) getURL(path string) string {
|
||||
return fmt.Sprintf("http://%s%s%s", am.apiAddr, am.opts.RoutePrefix, path)
|
||||
return fmt.Sprintf("http://%s%s%s", am.APIAddr, am.Opts.RoutePrefix, path)
|
||||
}
|
||||
|
||||
// Version runs the 'amtool' command with the --version option and checks
|
||||
// for appropriate output.
|
||||
func Version() (string, error) {
|
||||
cmd := exec.Command(amtool, "--version")
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
versionRE := regexp.MustCompile(`^amtool, version (\d+\.\d+\.\d+) *`)
|
||||
matched := versionRE.FindStringSubmatch(string(out))
|
||||
if len(matched) != 2 {
|
||||
return "", errors.New("Unable to match version info regex: " + string(out))
|
||||
}
|
||||
return matched[1], nil
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ receivers:
|
||||
Tolerance: 150 * time.Millisecond,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
|
||||
@@ -104,7 +104,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
@@ -169,7 +169,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
@@ -225,7 +225,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
@@ -258,7 +258,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
@@ -296,7 +296,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
@@ -380,7 +380,7 @@ receivers:
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
wh := NewWebhook(t, co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
|
||||
@@ -1,260 +0,0 @@
|
||||
// Copyright 2019 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
)
|
||||
|
||||
// Collector gathers alerts received by a notification receiver
|
||||
// and verifies whether all arrived and within the correct time boundaries.
|
||||
type Collector struct {
|
||||
t *testing.T
|
||||
name string
|
||||
opts *AcceptanceOpts
|
||||
|
||||
collected map[float64][]models.GettableAlerts
|
||||
expected map[Interval][]models.GettableAlerts
|
||||
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *Collector) String() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
// Collected returns a map of alerts collected by the collector indexed with the
|
||||
// receive timestamp.
|
||||
func (c *Collector) Collected() map[float64][]models.GettableAlerts {
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
return c.collected
|
||||
}
|
||||
|
||||
func batchesEqual(as, bs models.GettableAlerts, opts *AcceptanceOpts) bool {
|
||||
if len(as) != len(bs) {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, a := range as {
|
||||
found := false
|
||||
for _, b := range bs {
|
||||
if equalAlerts(a, b, opts) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// latest returns the latest relative point in time where a notification is
|
||||
// expected.
|
||||
func (c *Collector) latest() float64 {
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
var latest float64
|
||||
for iv := range c.expected {
|
||||
if iv.end > latest {
|
||||
latest = iv.end
|
||||
}
|
||||
}
|
||||
return latest
|
||||
}
|
||||
|
||||
// Want declares that the Collector expects to receive the given alerts
|
||||
// within the given time boundaries.
|
||||
func (c *Collector) Want(iv Interval, alerts ...*TestAlert) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
var nas models.GettableAlerts
|
||||
for _, a := range alerts {
|
||||
nas = append(nas, a.nativeAlert(c.opts))
|
||||
}
|
||||
|
||||
c.expected[iv] = append(c.expected[iv], nas)
|
||||
}
|
||||
|
||||
// add the given alerts to the collected alerts.
|
||||
func (c *Collector) add(alerts ...*models.GettableAlert) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
arrival := c.opts.relativeTime(time.Now())
|
||||
|
||||
c.collected[arrival] = append(c.collected[arrival], models.GettableAlerts(alerts))
|
||||
}
|
||||
|
||||
func (c *Collector) Check() string {
|
||||
var report strings.Builder
|
||||
report.WriteString(fmt.Sprintf("\ncollector %q:\n\n", c))
|
||||
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
for iv, expected := range c.expected {
|
||||
report.WriteString(fmt.Sprintf("interval %v\n", iv))
|
||||
|
||||
var alerts []models.GettableAlerts
|
||||
for at, got := range c.collected {
|
||||
if iv.contains(at) {
|
||||
alerts = append(alerts, got...)
|
||||
}
|
||||
}
|
||||
|
||||
for _, exp := range expected {
|
||||
found := len(exp) == 0 && len(alerts) == 0
|
||||
|
||||
report.WriteString("---\n")
|
||||
|
||||
for _, e := range exp {
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.alertString(e)))
|
||||
}
|
||||
|
||||
for _, a := range alerts {
|
||||
if batchesEqual(exp, a, c.opts) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
report.WriteString(" [ ✓ ]\n")
|
||||
} else {
|
||||
c.t.Fail()
|
||||
report.WriteString(" [ ✗ ]\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Detect unexpected notifications.
|
||||
var totalExp, totalAct int
|
||||
for _, exp := range c.expected {
|
||||
for _, e := range exp {
|
||||
totalExp += len(e)
|
||||
}
|
||||
}
|
||||
for _, act := range c.collected {
|
||||
for _, a := range act {
|
||||
if len(a) == 0 {
|
||||
c.t.Error("received empty notifications")
|
||||
}
|
||||
totalAct += len(a)
|
||||
}
|
||||
}
|
||||
if totalExp != totalAct {
|
||||
c.t.Fail()
|
||||
report.WriteString(fmt.Sprintf("\nExpected total of %d alerts, got %d", totalExp, totalAct))
|
||||
}
|
||||
|
||||
if c.t.Failed() {
|
||||
report.WriteString("\nreceived:\n")
|
||||
|
||||
for at, col := range c.collected {
|
||||
for _, alerts := range col {
|
||||
report.WriteString(fmt.Sprintf("@ %v\n", at))
|
||||
for _, a := range alerts {
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.alertString(a)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return report.String()
|
||||
}
|
||||
|
||||
// alertsToString returns a string representation of the given Alerts. Use for
|
||||
// debugging.
|
||||
func alertsToString(as []*models.GettableAlert) (string, error) {
|
||||
b, err := json.Marshal(as)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
// CompareCollectors compares two collectors based on their collected alerts.
|
||||
func CompareCollectors(a, b *Collector, opts *AcceptanceOpts) (bool, error) {
|
||||
f := func(collected map[float64][]models.GettableAlerts) []*models.GettableAlert {
|
||||
result := []*models.GettableAlert{}
|
||||
for _, batches := range collected {
|
||||
for _, batch := range batches {
|
||||
for _, alert := range batch {
|
||||
result = append(result, alert)
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
aAlerts := f(a.Collected())
|
||||
bAlerts := f(b.Collected())
|
||||
|
||||
if len(aAlerts) != len(bAlerts) {
|
||||
aAsString, err := alertsToString(aAlerts)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
bAsString, err := alertsToString(bAlerts)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = fmt.Errorf(
|
||||
"first collector has %v alerts, second collector has %v alerts\n%v\n%v",
|
||||
len(aAlerts), len(bAlerts),
|
||||
aAsString, bAsString,
|
||||
)
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, aAlert := range aAlerts {
|
||||
found := false
|
||||
for _, bAlert := range bAlerts {
|
||||
if equalAlerts(aAlert, bAlert, opts) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
aAsString, err := alertsToString([]*models.GettableAlert{aAlert})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
bAsString, err := alertsToString(bAlerts)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = fmt.Errorf(
|
||||
"could not find matching alert for alert from first collector\n%v\nin alerts of second collector\n%v",
|
||||
aAsString, bAsString,
|
||||
)
|
||||
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
249
test/cli/mock.go
249
test/cli/mock.go
@@ -14,45 +14,25 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/notify/webhook"
|
||||
"github.com/prometheus/alertmanager/test/testutils"
|
||||
)
|
||||
|
||||
// At is a convenience method to allow for declarative syntax of Acceptance
|
||||
// test definitions.
|
||||
func At(ts float64) float64 {
|
||||
return ts
|
||||
}
|
||||
// Re-export common types and functions from testutils.
|
||||
type (
|
||||
Interval = testutils.Interval
|
||||
TestAlert = testutils.TestAlert
|
||||
MockWebhook = testutils.MockWebhook
|
||||
)
|
||||
|
||||
type Interval struct {
|
||||
start, end float64
|
||||
}
|
||||
|
||||
func (iv Interval) String() string {
|
||||
return fmt.Sprintf("[%v,%v]", iv.start, iv.end)
|
||||
}
|
||||
|
||||
func (iv Interval) contains(f float64) bool {
|
||||
return f >= iv.start && f <= iv.end
|
||||
}
|
||||
|
||||
// Between is a convenience constructor for an interval for declarative syntax
|
||||
// of Acceptance test definitions.
|
||||
func Between(start, end float64) Interval {
|
||||
return Interval{start: start, end: end}
|
||||
}
|
||||
var (
|
||||
At = testutils.At
|
||||
Between = testutils.Between
|
||||
Alert = testutils.Alert
|
||||
NewWebhook = testutils.NewWebhook
|
||||
)
|
||||
|
||||
// TestSilence models a model.Silence with relative times.
|
||||
// This is the CLI-specific version with additional fields.
|
||||
type TestSilence struct {
|
||||
id string
|
||||
createdBy string
|
||||
@@ -116,204 +96,3 @@ func (s *TestSilence) ID() string {
|
||||
func (s *TestSilence) EndsAt() float64 {
|
||||
return s.endsAt
|
||||
}
|
||||
|
||||
// TestAlert models a model.Alert with relative times.
|
||||
type TestAlert struct {
|
||||
labels models.LabelSet
|
||||
annotations models.LabelSet
|
||||
startsAt, endsAt float64
|
||||
summary string
|
||||
}
|
||||
|
||||
// Alert creates a new alert declaration with the given key/value pairs
|
||||
// as identifying labels.
|
||||
func Alert(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
a := &TestAlert{
|
||||
labels: models.LabelSet{},
|
||||
annotations: models.LabelSet{},
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.labels[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// nativeAlert converts the declared test alert into a full alert based
|
||||
// on the given parameters.
|
||||
func (a *TestAlert) nativeAlert(opts *AcceptanceOpts) *models.GettableAlert {
|
||||
na := &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: a.labels,
|
||||
},
|
||||
Annotations: a.annotations,
|
||||
StartsAt: &strfmt.DateTime{},
|
||||
EndsAt: &strfmt.DateTime{},
|
||||
}
|
||||
|
||||
if a.startsAt > 0 {
|
||||
start := strfmt.DateTime(opts.expandTime(a.startsAt))
|
||||
na.StartsAt = &start
|
||||
}
|
||||
if a.endsAt > 0 {
|
||||
end := strfmt.DateTime(opts.expandTime(a.endsAt))
|
||||
na.EndsAt = &end
|
||||
}
|
||||
|
||||
return na
|
||||
}
|
||||
|
||||
// Annotate the alert with the given key/value pairs.
|
||||
func (a *TestAlert) Annotate(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.annotations[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// Active declares the relative activity time for this alert. It
|
||||
// must be a single starting value or two values where the second value
|
||||
// declares the resolved time.
|
||||
func (a *TestAlert) Active(tss ...float64) *TestAlert {
|
||||
if len(tss) > 2 || len(tss) == 0 {
|
||||
panic("only one or two timestamps allowed")
|
||||
}
|
||||
if len(tss) == 2 {
|
||||
a.endsAt = tss[1]
|
||||
}
|
||||
a.startsAt = tss[0]
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// HasLabels returns true if the two label sets are equivalent, otherwise false.
|
||||
func (a *TestAlert) HasLabels(labels models.LabelSet) bool {
|
||||
return reflect.DeepEqual(a.labels, labels)
|
||||
}
|
||||
|
||||
func equalAlerts(a, b *models.GettableAlert, opts *AcceptanceOpts) bool {
|
||||
if !reflect.DeepEqual(a.Labels, b.Labels) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !equalTime(time.Time(*a.StartsAt), time.Time(*b.StartsAt), opts) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt == nil) != (b.EndsAt == nil) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt != nil) && (b.EndsAt != nil) && !equalTime(time.Time(*a.EndsAt), time.Time(*b.EndsAt), opts) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func equalTime(a, b time.Time, opts *AcceptanceOpts) bool {
|
||||
if a.IsZero() != b.IsZero() {
|
||||
return false
|
||||
}
|
||||
|
||||
diff := a.Sub(b)
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
return diff <= opts.Tolerance
|
||||
}
|
||||
|
||||
type MockWebhook struct {
|
||||
opts *AcceptanceOpts
|
||||
collector *Collector
|
||||
listener net.Listener
|
||||
|
||||
// Func is called early on when retrieving a notification by an
|
||||
// Alertmanager. If Func returns true, the given notification is dropped.
|
||||
// See sample usage in `send_test.go/TestRetry()`.
|
||||
Func func(timestamp float64) bool
|
||||
}
|
||||
|
||||
func NewWebhook(c *Collector) *MockWebhook {
|
||||
l, err := net.Listen("tcp4", "localhost:0")
|
||||
if err != nil {
|
||||
// TODO(fabxc): if shutdown of mock destinations ever becomes a concern
|
||||
// we want to shut them down after test completion. Then we might want to
|
||||
// log the error properly, too.
|
||||
panic(err)
|
||||
}
|
||||
wh := &MockWebhook{
|
||||
listener: l,
|
||||
collector: c,
|
||||
opts: c.opts,
|
||||
}
|
||||
go func() {
|
||||
if err := http.Serve(l, wh); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return wh
|
||||
}
|
||||
|
||||
func (ws *MockWebhook) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// Inject Func if it exists.
|
||||
if ws.Func != nil {
|
||||
if ws.Func(ws.opts.relativeTime(time.Now())) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(req.Body)
|
||||
defer req.Body.Close()
|
||||
|
||||
var v webhook.Message
|
||||
if err := dec.Decode(&v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Transform the webhook message alerts back into model.Alerts.
|
||||
var alerts models.GettableAlerts
|
||||
for _, a := range v.Alerts {
|
||||
var (
|
||||
labels = models.LabelSet{}
|
||||
annotations = models.LabelSet{}
|
||||
)
|
||||
maps.Copy(labels, a.Labels)
|
||||
maps.Copy(annotations, a.Annotations)
|
||||
|
||||
start := strfmt.DateTime(a.StartsAt)
|
||||
end := strfmt.DateTime(a.EndsAt)
|
||||
|
||||
alerts = append(alerts, &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: labels,
|
||||
GeneratorURL: strfmt.URI(a.GeneratorURL),
|
||||
},
|
||||
Annotations: annotations,
|
||||
StartsAt: &start,
|
||||
EndsAt: &end,
|
||||
})
|
||||
}
|
||||
|
||||
ws.collector.add(alerts...)
|
||||
}
|
||||
|
||||
func (ws *MockWebhook) Address() string {
|
||||
return ws.listener.Addr().String()
|
||||
}
|
||||
|
||||
441
test/testutils/acceptance.go
Normal file
441
test/testutils/acceptance.go
Normal file
@@ -0,0 +1,441 @@
|
||||
// Copyright 2018 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package testutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apiclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
"github.com/prometheus/alertmanager/api/v2/client/general"
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
"github.com/go-openapi/strfmt"
|
||||
)
|
||||
|
||||
// AcceptanceOpts defines configuration parameters for an acceptance test.
|
||||
type AcceptanceOpts struct {
|
||||
FeatureFlags []string
|
||||
RoutePrefix string
|
||||
Tolerance time.Duration
|
||||
baseTime time.Time
|
||||
}
|
||||
|
||||
// AlertString formats an alert for display with relative times.
|
||||
func (opts *AcceptanceOpts) AlertString(a *models.GettableAlert) string {
|
||||
if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
|
||||
return fmt.Sprintf("%v[%v:]", a, opts.RelativeTime(time.Time(*a.StartsAt)))
|
||||
}
|
||||
return fmt.Sprintf("%v[%v:%v]", a, opts.RelativeTime(time.Time(*a.StartsAt)), opts.RelativeTime(time.Time(*a.EndsAt)))
|
||||
}
|
||||
|
||||
// ExpandTime returns the absolute time for the relative time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) ExpandTime(rel float64) time.Time {
|
||||
return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
|
||||
}
|
||||
|
||||
// RelativeTime returns the relative time for the given time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) RelativeTime(act time.Time) float64 {
|
||||
return float64(act.Sub(opts.baseTime)) / float64(time.Second)
|
||||
}
|
||||
|
||||
// SetBaseTime sets the base time for relative time calculations.
|
||||
func (opts *AcceptanceOpts) SetBaseTime(t time.Time) {
|
||||
opts.baseTime = t
|
||||
}
|
||||
|
||||
// FreeAddress returns a new listen address not currently in use.
|
||||
func FreeAddress() string {
|
||||
// Let the OS allocate a free address, close it and hope
|
||||
// it is still free when starting Alertmanager.
|
||||
l, err := net.Listen("tcp4", "localhost:0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := l.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return l.Addr().String()
|
||||
}
|
||||
|
||||
// AcceptanceTest provides declarative definition of given inputs and expected
|
||||
// output of an Alertmanager setup.
|
||||
type AcceptanceTest struct {
|
||||
*testing.T
|
||||
|
||||
opts *AcceptanceOpts
|
||||
|
||||
amc *AlertmanagerCluster
|
||||
collectors []*Collector
|
||||
|
||||
actions map[float64][]func()
|
||||
}
|
||||
|
||||
// NewAcceptanceTest returns a new acceptance test with the base time
|
||||
// set to the current time.
|
||||
func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
|
||||
test := &AcceptanceTest{
|
||||
T: t,
|
||||
opts: opts,
|
||||
actions: map[float64][]func(){},
|
||||
}
|
||||
return test
|
||||
}
|
||||
|
||||
// Do sets the given function to be executed at the given time.
|
||||
func (t *AcceptanceTest) Do(at float64, f func()) {
|
||||
t.actions[at] = append(t.actions[at], f)
|
||||
}
|
||||
|
||||
// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
|
||||
// cluster of Alertmanager instances on random ports.
|
||||
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
|
||||
amc := AlertmanagerCluster{}
|
||||
|
||||
for range size {
|
||||
am := &Alertmanager{
|
||||
T: t,
|
||||
Opts: t.opts,
|
||||
}
|
||||
|
||||
dir, err := os.MkdirTemp("", "am_test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.dir = dir
|
||||
|
||||
cf, err := os.Create(filepath.Join(dir, "config.yml"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.confFile = cf
|
||||
am.UpdateConfig(conf)
|
||||
|
||||
am.APIAddr = FreeAddress()
|
||||
am.ClusterAddr = FreeAddress()
|
||||
|
||||
transport := httptransport.New(am.APIAddr, t.opts.RoutePrefix+"/api/v2/", nil)
|
||||
am.clientV2 = apiclient.New(transport, strfmt.Default)
|
||||
|
||||
amc.ams = append(amc.ams, am)
|
||||
}
|
||||
|
||||
t.amc = &amc
|
||||
|
||||
return &amc
|
||||
}
|
||||
|
||||
// Collector returns a new collector bound to the test instance.
|
||||
func (t *AcceptanceTest) Collector(name string) *Collector {
|
||||
co := NewCollector(t.T, name, t.opts)
|
||||
t.collectors = append(t.collectors, co)
|
||||
|
||||
return co
|
||||
}
|
||||
|
||||
// Run starts all Alertmanagers and runs queries against them. It then checks
|
||||
// whether all expected notifications have arrived at the expected receiver.
|
||||
func (t *AcceptanceTest) Run(additionalArgs ...string) {
|
||||
errc := make(chan error)
|
||||
|
||||
for _, am := range t.amc.ams {
|
||||
am.errc = errc
|
||||
t.Cleanup(am.Terminate)
|
||||
t.Cleanup(am.cleanup)
|
||||
}
|
||||
|
||||
err := t.amc.Start(additionalArgs...)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the reference time right before running the test actions to avoid
|
||||
// test failures due to slow setup of the test environment.
|
||||
t.opts.SetBaseTime(time.Now())
|
||||
|
||||
go t.runActions()
|
||||
|
||||
var latest float64
|
||||
for _, coll := range t.collectors {
|
||||
if l := coll.Latest(); l > latest {
|
||||
latest = l
|
||||
}
|
||||
}
|
||||
|
||||
deadline := t.opts.ExpandTime(latest)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(deadline)):
|
||||
// continue
|
||||
case err := <-errc:
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// runActions performs the stored actions at the defined times.
|
||||
func (t *AcceptanceTest) runActions() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for at, fs := range t.actions {
|
||||
ts := t.opts.ExpandTime(at)
|
||||
wg.Add(len(fs))
|
||||
|
||||
for _, f := range fs {
|
||||
go func(f func()) {
|
||||
time.Sleep(time.Until(ts))
|
||||
f()
|
||||
wg.Done()
|
||||
}(f)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
b bytes.Buffer
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (b *buffer) Write(p []byte) (int, error) {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.Write(p)
|
||||
}
|
||||
|
||||
func (b *buffer) String() string {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.String()
|
||||
}
|
||||
|
||||
// Alertmanager encapsulates an Alertmanager process and allows
|
||||
// declaring alerts being pushed to it at fixed points in time.
|
||||
type Alertmanager struct {
|
||||
T *AcceptanceTest
|
||||
Opts *AcceptanceOpts
|
||||
|
||||
APIAddr string
|
||||
ClusterAddr string
|
||||
|
||||
clientV2 *apiclient.AlertmanagerAPI
|
||||
confFile *os.File
|
||||
dir string
|
||||
|
||||
cmd *exec.Cmd
|
||||
errc chan<- error
|
||||
}
|
||||
|
||||
// AlertmanagerCluster represents a group of Alertmanager instances
|
||||
// acting as a cluster.
|
||||
type AlertmanagerCluster struct {
|
||||
ams []*Alertmanager
|
||||
}
|
||||
|
||||
// Start the Alertmanager cluster and wait until it is ready to receive.
|
||||
func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error {
|
||||
args := additionalArgs
|
||||
for _, am := range amc.ams {
|
||||
args = append(args, "--cluster.peer="+am.ClusterAddr)
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.Start(args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start alertmanager cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.WaitForCluster(len(amc.ams))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %w", am.ClusterAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Members returns the underlying slice of cluster members.
|
||||
func (amc *AlertmanagerCluster) Members() []*Alertmanager {
|
||||
return amc.ams
|
||||
}
|
||||
|
||||
// Start the alertmanager and wait until it is ready to receive.
|
||||
func (am *Alertmanager) Start(additionalArg []string) error {
|
||||
am.T.Helper()
|
||||
args := []string{
|
||||
"--config.file", am.confFile.Name(),
|
||||
"--log.level", "debug",
|
||||
"--web.listen-address", am.APIAddr,
|
||||
"--storage.path", am.dir,
|
||||
"--cluster.listen-address", am.ClusterAddr,
|
||||
"--cluster.settle-timeout", "0s",
|
||||
}
|
||||
if len(am.Opts.FeatureFlags) > 0 {
|
||||
args = append(args, "--enable-feature", strings.Join(am.Opts.FeatureFlags, ","))
|
||||
}
|
||||
if am.Opts.RoutePrefix != "" {
|
||||
args = append(args, "--web.route-prefix", am.Opts.RoutePrefix)
|
||||
}
|
||||
args = append(args, additionalArg...)
|
||||
|
||||
cmd := exec.Command("../../../alertmanager", args...)
|
||||
|
||||
if am.cmd == nil {
|
||||
var outb, errb buffer
|
||||
cmd.Stdout = &outb
|
||||
cmd.Stderr = &errb
|
||||
} else {
|
||||
cmd.Stdout = am.cmd.Stdout
|
||||
cmd.Stderr = am.cmd.Stderr
|
||||
}
|
||||
am.cmd = cmd
|
||||
|
||||
if err := am.cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := am.cmd.Wait(); err != nil {
|
||||
am.errc <- err
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
var lastErr error
|
||||
for range 10 {
|
||||
_, lastErr = am.clientV2.General.GetStatus(nil)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
return fmt.Errorf("unable to get a successful response from the Alertmanager: %w", lastErr)
|
||||
}
|
||||
|
||||
// WaitForCluster waits for the Alertmanager instance to join a cluster with the
|
||||
// given size.
|
||||
func (am *Alertmanager) WaitForCluster(size int) error {
|
||||
params := general.NewGetStatusParams()
|
||||
params.WithContext(context.Background())
|
||||
var status *general.GetStatusOK
|
||||
|
||||
// Poll for 2s
|
||||
for range 20 {
|
||||
var err error
|
||||
status, err = am.clientV2.General.GetStatus(params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(status.Payload.Cluster.Peers) == size {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"expected %v peers, but got %v",
|
||||
size,
|
||||
len(status.Payload.Cluster.Peers),
|
||||
)
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager cluster processes and removes intermediate
|
||||
// data.
|
||||
func (amc *AlertmanagerCluster) Terminate() {
|
||||
for _, am := range amc.ams {
|
||||
am.Terminate()
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager process and remove intermediate
|
||||
// data.
|
||||
func (am *Alertmanager) Terminate() {
|
||||
am.T.Helper()
|
||||
if am.cmd.Process != nil {
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
|
||||
am.T.Logf("Error sending SIGTERM to Alertmanager process: %v", err)
|
||||
}
|
||||
am.T.Logf("stdout:\n%v", am.cmd.Stdout)
|
||||
am.T.Logf("stderr:\n%v", am.cmd.Stderr)
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager instances.
|
||||
func (amc *AlertmanagerCluster) Reload() {
|
||||
for _, am := range amc.ams {
|
||||
am.Reload()
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager process.
|
||||
func (am *Alertmanager) Reload() {
|
||||
am.T.Helper()
|
||||
if am.cmd.Process != nil {
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
|
||||
am.T.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (am *Alertmanager) cleanup() {
|
||||
am.T.Helper()
|
||||
if err := os.RemoveAll(am.confFile.Name()); err != nil {
|
||||
am.T.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
|
||||
// does not initiate config reloading.
|
||||
func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
|
||||
for _, am := range amc.ams {
|
||||
am.UpdateConfig(conf)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
|
||||
// initiate config reloading.
|
||||
func (am *Alertmanager) UpdateConfig(conf string) {
|
||||
if _, err := am.confFile.WriteString(conf); err != nil {
|
||||
am.T.Fatal(err)
|
||||
}
|
||||
if err := am.confFile.Sync(); err != nil {
|
||||
am.T.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Client returns a client to interact with the API v2 endpoint.
|
||||
func (am *Alertmanager) Client() *apiclient.AlertmanagerAPI {
|
||||
return am.clientV2
|
||||
}
|
||||
@@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
package testutils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -37,10 +37,26 @@ type Collector struct {
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
// NewCollector creates a new Collector with the given parameters.
|
||||
func NewCollector(t *testing.T, name string, opts *AcceptanceOpts) *Collector {
|
||||
return &Collector{
|
||||
t: t,
|
||||
name: name,
|
||||
opts: opts,
|
||||
collected: map[float64][]models.GettableAlerts{},
|
||||
expected: map[Interval][]models.GettableAlerts{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) String() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
// Opts returns the acceptance options for this collector.
|
||||
func (c *Collector) Opts() *AcceptanceOpts {
|
||||
return c.opts
|
||||
}
|
||||
|
||||
// Collected returns a map of alerts collected by the collector indexed with the
|
||||
// receive timestamp.
|
||||
func (c *Collector) Collected() map[float64][]models.GettableAlerts {
|
||||
@@ -57,7 +73,7 @@ func batchesEqual(as, bs models.GettableAlerts, opts *AcceptanceOpts) bool {
|
||||
for _, a := range as {
|
||||
found := false
|
||||
for _, b := range bs {
|
||||
if equalAlerts(a, b, opts) {
|
||||
if EqualAlerts(a, b, opts) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
@@ -69,9 +85,9 @@ func batchesEqual(as, bs models.GettableAlerts, opts *AcceptanceOpts) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// latest returns the latest relative point in time where a notification is
|
||||
// Latest returns the latest relative point in time where a notification is
|
||||
// expected.
|
||||
func (c *Collector) latest() float64 {
|
||||
func (c *Collector) Latest() float64 {
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
var latest float64
|
||||
@@ -90,17 +106,18 @@ func (c *Collector) Want(iv Interval, alerts ...*TestAlert) {
|
||||
defer c.mtx.Unlock()
|
||||
var nas models.GettableAlerts
|
||||
for _, a := range alerts {
|
||||
nas = append(nas, a.nativeAlert(c.opts))
|
||||
nas = append(nas, a.NativeAlert(c.opts))
|
||||
}
|
||||
|
||||
c.expected[iv] = append(c.expected[iv], nas)
|
||||
}
|
||||
|
||||
// add the given alerts to the collected alerts.
|
||||
func (c *Collector) add(alerts ...*models.GettableAlert) {
|
||||
// Add the given alerts to the collected alerts.
|
||||
// This is exported so it can be used by MockWebhook implementations.
|
||||
func (c *Collector) Add(alerts ...*models.GettableAlert) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
arrival := c.opts.relativeTime(time.Now())
|
||||
arrival := c.opts.RelativeTime(time.Now())
|
||||
|
||||
c.collected[arrival] = append(c.collected[arrival], models.GettableAlerts(alerts))
|
||||
}
|
||||
@@ -127,7 +144,7 @@ func (c *Collector) Check() string {
|
||||
report.WriteString("---\n")
|
||||
|
||||
for _, e := range exp {
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.alertString(e)))
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.AlertString(e)))
|
||||
}
|
||||
|
||||
for _, a := range alerts {
|
||||
@@ -173,7 +190,7 @@ func (c *Collector) Check() string {
|
||||
for _, alerts := range col {
|
||||
report.WriteString(fmt.Sprintf("@ %v\n", at))
|
||||
for _, a := range alerts {
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.alertString(a)))
|
||||
report.WriteString(fmt.Sprintf("- %v\n", c.opts.AlertString(a)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -231,7 +248,7 @@ func CompareCollectors(a, b *Collector, opts *AcceptanceOpts) (bool, error) {
|
||||
for _, aAlert := range aAlerts {
|
||||
found := false
|
||||
for _, bAlert := range bAlerts {
|
||||
if equalAlerts(aAlert, bAlert, opts) {
|
||||
if EqualAlerts(aAlert, bAlert, opts) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
258
test/testutils/mock.go
Normal file
258
test/testutils/mock.go
Normal file
@@ -0,0 +1,258 @@
|
||||
// Copyright 2018 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package testutils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/notify/webhook"
|
||||
)
|
||||
|
||||
// At is a convenience method to allow for declarative syntax of Acceptance
|
||||
// test definitions.
|
||||
func At(ts float64) float64 {
|
||||
return ts
|
||||
}
|
||||
|
||||
type Interval struct {
|
||||
start, end float64
|
||||
}
|
||||
|
||||
func (iv Interval) String() string {
|
||||
return fmt.Sprintf("[%v,%v]", iv.start, iv.end)
|
||||
}
|
||||
|
||||
func (iv Interval) contains(f float64) bool {
|
||||
return f >= iv.start && f <= iv.end
|
||||
}
|
||||
|
||||
// Between is a convenience constructor for an interval for declarative syntax
|
||||
// of Acceptance test definitions.
|
||||
func Between(start, end float64) Interval {
|
||||
return Interval{start: start, end: end}
|
||||
}
|
||||
|
||||
// TestAlert models a model.Alert with relative times.
|
||||
type TestAlert struct {
|
||||
Labels models.LabelSet
|
||||
Annotations models.LabelSet
|
||||
StartsAt, EndsAt float64
|
||||
Summary string // CLI-specific field, unused in with_api_v2
|
||||
}
|
||||
|
||||
// Alert creates a new alert declaration with the given key/value pairs
|
||||
// as identifying labels.
|
||||
func Alert(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
a := &TestAlert{
|
||||
Labels: models.LabelSet{},
|
||||
Annotations: models.LabelSet{},
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.Labels[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// NativeAlert converts the declared test alert into a full alert based
|
||||
// on the given parameters.
|
||||
func (a *TestAlert) NativeAlert(opts *AcceptanceOpts) *models.GettableAlert {
|
||||
na := &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: a.Labels,
|
||||
},
|
||||
Annotations: a.Annotations,
|
||||
StartsAt: &strfmt.DateTime{},
|
||||
EndsAt: &strfmt.DateTime{},
|
||||
}
|
||||
|
||||
if a.StartsAt > 0 {
|
||||
start := strfmt.DateTime(opts.ExpandTime(a.StartsAt))
|
||||
na.StartsAt = &start
|
||||
}
|
||||
if a.EndsAt > 0 {
|
||||
end := strfmt.DateTime(opts.ExpandTime(a.EndsAt))
|
||||
na.EndsAt = &end
|
||||
}
|
||||
|
||||
return na
|
||||
}
|
||||
|
||||
// Annotate the alert with the given key/value pairs.
|
||||
func (a *TestAlert) Annotate(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.Annotations[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// Active declares the relative activity time for this alert. It
|
||||
// must be a single starting value or two values where the second value
|
||||
// declares the resolved time.
|
||||
func (a *TestAlert) Active(tss ...float64) *TestAlert {
|
||||
if len(tss) > 2 || len(tss) == 0 {
|
||||
panic("only one or two timestamps allowed")
|
||||
}
|
||||
if len(tss) == 2 {
|
||||
a.EndsAt = tss[1]
|
||||
}
|
||||
a.StartsAt = tss[0]
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// HasLabels returns true if the two label sets are equivalent, otherwise false.
|
||||
// CLI-specific method, unused in with_api_v2.
|
||||
func (a *TestAlert) HasLabels(labels models.LabelSet) bool {
|
||||
return reflect.DeepEqual(a.Labels, labels)
|
||||
}
|
||||
|
||||
// EqualAlerts compares two alerts for equality, considering the tolerance.
|
||||
func EqualAlerts(a, b *models.GettableAlert, opts *AcceptanceOpts) bool {
|
||||
if !reflect.DeepEqual(a.Labels, b.Labels) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !EqualTime(time.Time(*a.StartsAt), time.Time(*b.StartsAt), opts) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt == nil) != (b.EndsAt == nil) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt != nil) && (b.EndsAt != nil) && !EqualTime(time.Time(*a.EndsAt), time.Time(*b.EndsAt), opts) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// EqualTime compares two times for equality within the tolerance.
|
||||
func EqualTime(a, b time.Time, opts *AcceptanceOpts) bool {
|
||||
if a.IsZero() != b.IsZero() {
|
||||
return false
|
||||
}
|
||||
|
||||
diff := a.Sub(b)
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
return diff <= opts.Tolerance
|
||||
}
|
||||
|
||||
// MockWebhook provides a mock HTTP webhook receiver for testing.
|
||||
type MockWebhook struct {
|
||||
opts *AcceptanceOpts
|
||||
collector *Collector
|
||||
addr string
|
||||
|
||||
// Func is called early on when retrieving a notification by an
|
||||
// Alertmanager. If Func returns true, the given notification is dropped.
|
||||
// See sample usage in `send_test.go/TestRetry()`.
|
||||
Func func(timestamp float64) bool
|
||||
}
|
||||
|
||||
// NewWebhook creates a new MockWebhook that collects alerts via HTTP.
|
||||
func NewWebhook(t *testing.T, c *Collector) *MockWebhook {
|
||||
t.Helper()
|
||||
|
||||
wh := &MockWebhook{
|
||||
collector: c,
|
||||
opts: c.Opts(),
|
||||
}
|
||||
|
||||
server := httptest.NewServer(wh)
|
||||
wh.addr = server.Listener.Addr().String()
|
||||
|
||||
t.Cleanup(func() {
|
||||
server.Close()
|
||||
})
|
||||
|
||||
return wh
|
||||
}
|
||||
|
||||
// ServeHTTP handles incoming webhook requests.
|
||||
func (ws *MockWebhook) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// Inject drop function if it exists.
|
||||
if ws.Func != nil {
|
||||
if ws.Func(ws.opts.RelativeTime(time.Now())) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(req.Body)
|
||||
defer req.Body.Close()
|
||||
|
||||
var v webhook.Message
|
||||
if err := dec.Decode(&v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Transform the webhook message alerts back into model.Alerts.
|
||||
var alerts models.GettableAlerts
|
||||
for _, a := range v.Alerts {
|
||||
var (
|
||||
labels = models.LabelSet{}
|
||||
annotations = models.LabelSet{}
|
||||
)
|
||||
maps.Copy(labels, a.Labels)
|
||||
maps.Copy(annotations, a.Annotations)
|
||||
|
||||
start := strfmt.DateTime(a.StartsAt)
|
||||
end := strfmt.DateTime(a.EndsAt)
|
||||
|
||||
alerts = append(alerts, &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: labels,
|
||||
GeneratorURL: strfmt.URI(a.GeneratorURL),
|
||||
},
|
||||
Annotations: annotations,
|
||||
StartsAt: &start,
|
||||
EndsAt: &end,
|
||||
})
|
||||
}
|
||||
|
||||
ws.collector.Add(alerts...)
|
||||
}
|
||||
|
||||
// Address returns the address of the mock webhook server.
|
||||
func (ws *MockWebhook) Address() string {
|
||||
return ws.addr
|
||||
}
|
||||
@@ -14,413 +14,69 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apiclient "github.com/prometheus/alertmanager/api/v2/client"
|
||||
"github.com/go-openapi/strfmt"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/client/alert"
|
||||
"github.com/prometheus/alertmanager/api/v2/client/general"
|
||||
"github.com/prometheus/alertmanager/api/v2/client/silence"
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/prometheus/alertmanager/test/testutils"
|
||||
)
|
||||
|
||||
// AcceptanceTest provides declarative definition of given inputs and expected
|
||||
// output of an Alertmanager setup.
|
||||
// Re-export common types and functions from testutils.
|
||||
type (
|
||||
Collector = testutils.Collector
|
||||
AcceptanceOpts = testutils.AcceptanceOpts
|
||||
)
|
||||
|
||||
var CompareCollectors = testutils.CompareCollectors
|
||||
|
||||
// AcceptanceTest wraps testutils.AcceptanceTest for API-based testing.
|
||||
type AcceptanceTest struct {
|
||||
*testing.T
|
||||
|
||||
opts *AcceptanceOpts
|
||||
|
||||
amc *AlertmanagerCluster
|
||||
collectors []*Collector
|
||||
|
||||
actions map[float64][]func()
|
||||
*testutils.AcceptanceTest
|
||||
}
|
||||
|
||||
// AcceptanceOpts defines configuration parameters for an acceptance test.
|
||||
type AcceptanceOpts struct {
|
||||
FeatureFlags []string
|
||||
RoutePrefix string
|
||||
Tolerance time.Duration
|
||||
baseTime time.Time
|
||||
}
|
||||
|
||||
func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
|
||||
if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
|
||||
return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
|
||||
}
|
||||
return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
|
||||
}
|
||||
|
||||
// expandTime returns the absolute time for the relative time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
|
||||
return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
|
||||
}
|
||||
|
||||
// expandTime returns the relative time for the given time
|
||||
// calculated from the test's base time.
|
||||
func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
|
||||
return float64(act.Sub(opts.baseTime)) / float64(time.Second)
|
||||
}
|
||||
|
||||
// NewAcceptanceTest returns a new acceptance test with the base time
|
||||
// set to the current time.
|
||||
// NewAcceptanceTest returns a new acceptance test.
|
||||
func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
|
||||
test := &AcceptanceTest{
|
||||
T: t,
|
||||
opts: opts,
|
||||
actions: map[float64][]func(){},
|
||||
}
|
||||
return test
|
||||
}
|
||||
|
||||
// freeAddress returns a new listen address not currently in use.
|
||||
func freeAddress() string {
|
||||
// Let the OS allocate a free address, close it and hope
|
||||
// it is still free when starting Alertmanager.
|
||||
l, err := net.Listen("tcp4", "localhost:0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := l.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return l.Addr().String()
|
||||
}
|
||||
|
||||
// Do sets the given function to be executed at the given time.
|
||||
func (t *AcceptanceTest) Do(at float64, f func()) {
|
||||
t.actions[at] = append(t.actions[at], f)
|
||||
}
|
||||
|
||||
// AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
|
||||
// cluster of Alertmanager instances on random ports.
|
||||
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
|
||||
amc := AlertmanagerCluster{}
|
||||
|
||||
for range size {
|
||||
am := &Alertmanager{
|
||||
t: t,
|
||||
opts: t.opts,
|
||||
}
|
||||
|
||||
dir, err := os.MkdirTemp("", "am_test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.dir = dir
|
||||
|
||||
cf, err := os.Create(filepath.Join(dir, "config.yml"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
am.confFile = cf
|
||||
am.UpdateConfig(conf)
|
||||
|
||||
am.apiAddr = freeAddress()
|
||||
am.clusterAddr = freeAddress()
|
||||
|
||||
transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
|
||||
am.clientV2 = apiclient.New(transport, strfmt.Default)
|
||||
|
||||
amc.ams = append(amc.ams, am)
|
||||
}
|
||||
|
||||
t.amc = &amc
|
||||
|
||||
return &amc
|
||||
}
|
||||
|
||||
// Collector returns a new collector bound to the test instance.
|
||||
func (t *AcceptanceTest) Collector(name string) *Collector {
|
||||
co := &Collector{
|
||||
t: t.T,
|
||||
name: name,
|
||||
opts: t.opts,
|
||||
collected: map[float64][]models.GettableAlerts{},
|
||||
expected: map[Interval][]models.GettableAlerts{},
|
||||
}
|
||||
t.collectors = append(t.collectors, co)
|
||||
|
||||
return co
|
||||
}
|
||||
|
||||
// Run starts all Alertmanagers and runs queries against them. It then checks
|
||||
// whether all expected notifications have arrived at the expected receiver.
|
||||
func (t *AcceptanceTest) Run(additionalArgs ...string) {
|
||||
errc := make(chan error)
|
||||
|
||||
for _, am := range t.amc.ams {
|
||||
am.errc = errc
|
||||
t.Cleanup(am.Terminate)
|
||||
t.Cleanup(am.cleanup)
|
||||
}
|
||||
|
||||
err := t.amc.Start(additionalArgs...)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the reference time right before running the test actions to avoid
|
||||
// test failures due to slow setup of the test environment.
|
||||
t.opts.baseTime = time.Now()
|
||||
|
||||
go t.runActions()
|
||||
|
||||
var latest float64
|
||||
for _, coll := range t.collectors {
|
||||
if l := coll.latest(); l > latest {
|
||||
latest = l
|
||||
}
|
||||
}
|
||||
|
||||
deadline := t.opts.expandTime(latest)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(deadline)):
|
||||
// continue
|
||||
case err := <-errc:
|
||||
t.Error(err)
|
||||
return &AcceptanceTest{
|
||||
AcceptanceTest: testutils.NewAcceptanceTest(t, opts),
|
||||
}
|
||||
}
|
||||
|
||||
// runActions performs the stored actions at the defined times.
|
||||
func (t *AcceptanceTest) runActions() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for at, fs := range t.actions {
|
||||
ts := t.opts.expandTime(at)
|
||||
wg.Add(len(fs))
|
||||
|
||||
for _, f := range fs {
|
||||
go func(f func()) {
|
||||
time.Sleep(time.Until(ts))
|
||||
f()
|
||||
wg.Done()
|
||||
}(f)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
b bytes.Buffer
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (b *buffer) Write(p []byte) (int, error) {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.Write(p)
|
||||
}
|
||||
|
||||
func (b *buffer) String() string {
|
||||
b.mtx.Lock()
|
||||
defer b.mtx.Unlock()
|
||||
return b.b.String()
|
||||
}
|
||||
|
||||
// Alertmanager encapsulates an Alertmanager process and allows
|
||||
// declaring alerts being pushed to it at fixed points in time.
|
||||
// Alertmanager wraps testutils.Alertmanager and adds API-specific methods.
|
||||
type Alertmanager struct {
|
||||
t *AcceptanceTest
|
||||
opts *AcceptanceOpts
|
||||
|
||||
apiAddr string
|
||||
clusterAddr string
|
||||
clientV2 *apiclient.AlertmanagerAPI
|
||||
confFile *os.File
|
||||
dir string
|
||||
|
||||
cmd *exec.Cmd
|
||||
errc chan<- error
|
||||
*testutils.Alertmanager
|
||||
}
|
||||
|
||||
// AlertmanagerCluster represents a group of Alertmanager instances
|
||||
// acting as a cluster.
|
||||
// AlertmanagerCluster wraps testutils.AlertmanagerCluster and adds API-specific methods.
|
||||
type AlertmanagerCluster struct {
|
||||
ams []*Alertmanager
|
||||
*testutils.AlertmanagerCluster
|
||||
}
|
||||
|
||||
// Start the Alertmanager cluster and wait until it is ready to receive.
|
||||
func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error {
|
||||
args := additionalArgs
|
||||
for _, am := range amc.ams {
|
||||
args = append(args, "--cluster.peer="+am.clusterAddr)
|
||||
// AlertmanagerCluster returns a new AlertmanagerCluster.
|
||||
func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
|
||||
return &AlertmanagerCluster{
|
||||
AlertmanagerCluster: t.AcceptanceTest.AlertmanagerCluster(conf, size),
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.Start(args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start alertmanager cluster: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.WaitForCluster(len(amc.ams))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %w", am.clusterAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Members returns the underlying slice of cluster members.
|
||||
// Members returns the underlying Alertmanager instances wrapped for API testing.
|
||||
func (amc *AlertmanagerCluster) Members() []*Alertmanager {
|
||||
return amc.ams
|
||||
}
|
||||
|
||||
// Start the alertmanager and wait until it is ready to receive.
|
||||
func (am *Alertmanager) Start(additionalArg []string) error {
|
||||
am.t.Helper()
|
||||
args := []string{
|
||||
"--config.file", am.confFile.Name(),
|
||||
"--log.level", "debug",
|
||||
"--web.listen-address", am.apiAddr,
|
||||
"--storage.path", am.dir,
|
||||
"--cluster.listen-address", am.clusterAddr,
|
||||
"--cluster.settle-timeout", "0s",
|
||||
}
|
||||
if len(am.opts.FeatureFlags) > 0 {
|
||||
args = append(args, "--enable-feature", strings.Join(am.opts.FeatureFlags, ","))
|
||||
}
|
||||
if am.opts.RoutePrefix != "" {
|
||||
args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
|
||||
}
|
||||
args = append(args, additionalArg...)
|
||||
|
||||
cmd := exec.Command("../../../alertmanager", args...)
|
||||
|
||||
if am.cmd == nil {
|
||||
var outb, errb buffer
|
||||
cmd.Stdout = &outb
|
||||
cmd.Stderr = &errb
|
||||
} else {
|
||||
cmd.Stdout = am.cmd.Stdout
|
||||
cmd.Stderr = am.cmd.Stderr
|
||||
}
|
||||
am.cmd = cmd
|
||||
|
||||
if err := am.cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := am.cmd.Wait(); err != nil {
|
||||
am.errc <- err
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
var lastErr error
|
||||
for range 10 {
|
||||
_, lastErr = am.clientV2.General.GetStatus(nil)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
return fmt.Errorf("unable to get a successful response from the Alertmanager: %w", lastErr)
|
||||
}
|
||||
|
||||
// WaitForCluster waits for the Alertmanager instance to join a cluster with the
|
||||
// given size.
|
||||
func (am *Alertmanager) WaitForCluster(size int) error {
|
||||
params := general.NewGetStatusParams()
|
||||
params.WithContext(context.Background())
|
||||
var status *general.GetStatusOK
|
||||
|
||||
// Poll for 2s
|
||||
for range 20 {
|
||||
var err error
|
||||
status, err = am.clientV2.General.GetStatus(params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(status.Payload.Cluster.Peers) == size {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
return fmt.Errorf(
|
||||
"expected %v peers, but got %v",
|
||||
size,
|
||||
len(status.Payload.Cluster.Peers),
|
||||
)
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager cluster processes and removes intermediate
|
||||
// data.
|
||||
func (amc *AlertmanagerCluster) Terminate() {
|
||||
for _, am := range amc.ams {
|
||||
am.Terminate()
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate kills the underlying Alertmanager process and remove intermediate
|
||||
// data.
|
||||
func (am *Alertmanager) Terminate() {
|
||||
am.t.Helper()
|
||||
if am.cmd.Process != nil {
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
|
||||
am.t.Logf("Error sending SIGTERM to Alertmanager process: %v", err)
|
||||
}
|
||||
am.t.Logf("stdout:\n%v", am.cmd.Stdout)
|
||||
am.t.Logf("stderr:\n%v", am.cmd.Stderr)
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager instances.
|
||||
func (amc *AlertmanagerCluster) Reload() {
|
||||
for _, am := range amc.ams {
|
||||
am.Reload()
|
||||
}
|
||||
}
|
||||
|
||||
// Reload sends the reloading signal to the Alertmanager process.
|
||||
func (am *Alertmanager) Reload() {
|
||||
am.t.Helper()
|
||||
if am.cmd.Process != nil {
|
||||
if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
|
||||
am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (am *Alertmanager) cleanup() {
|
||||
am.t.Helper()
|
||||
if err := os.RemoveAll(am.confFile.Name()); err != nil {
|
||||
am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
|
||||
baseMembers := amc.AlertmanagerCluster.Members()
|
||||
wrapped := make([]*Alertmanager, len(baseMembers))
|
||||
for i, am := range baseMembers {
|
||||
wrapped[i] = &Alertmanager{Alertmanager: am}
|
||||
}
|
||||
return wrapped
|
||||
}
|
||||
|
||||
// Push declares alerts that are to be pushed to the Alertmanager
|
||||
// servers at a relative point in time.
|
||||
func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) {
|
||||
for _, am := range amc.ams {
|
||||
for _, am := range amc.Members() {
|
||||
am.Push(at, alerts...)
|
||||
}
|
||||
}
|
||||
@@ -428,10 +84,10 @@ func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) {
|
||||
// Push declares alerts that are to be pushed to the Alertmanager
|
||||
// server at a relative point in time.
|
||||
func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
|
||||
am.t.Do(at, func() {
|
||||
am.T.Do(at, func() {
|
||||
var cas models.PostableAlerts
|
||||
for i := range alerts {
|
||||
a := alerts[i].nativeAlert(am.opts)
|
||||
a := alerts[i].NativeAlert(am.Opts)
|
||||
alert := &models.PostableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: a.Labels,
|
||||
@@ -451,32 +107,32 @@ func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
|
||||
params := alert.PostAlertsParams{}
|
||||
params.WithContext(context.Background()).WithAlerts(cas)
|
||||
|
||||
_, err := am.clientV2.Alert.PostAlerts(¶ms)
|
||||
_, err := am.Client().Alert.PostAlerts(¶ms)
|
||||
if err != nil {
|
||||
am.t.Errorf("Error pushing %v: %v", cas, err)
|
||||
am.T.Errorf("Error pushing %v: %v", cas, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// SetSilence updates or creates the given Silence.
|
||||
func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
|
||||
for _, am := range amc.ams {
|
||||
for _, am := range amc.Members() {
|
||||
am.SetSilence(at, sil)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSilence updates or creates the given Silence.
|
||||
func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
|
||||
am.t.Do(at, func() {
|
||||
resp, err := am.clientV2.Silence.PostSilences(
|
||||
am.T.Do(at, func() {
|
||||
resp, err := am.Client().Silence.PostSilences(
|
||||
silence.NewPostSilencesParams().WithSilence(
|
||||
&models.PostableSilence{
|
||||
Silence: *sil.nativeSilence(am.opts),
|
||||
Silence: *sil.nativeSilence(am.Opts),
|
||||
},
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
am.t.Errorf("Error setting silence %v: %s", sil, err)
|
||||
am.T.Errorf("Error setting silence %v: %s", sil, err)
|
||||
return
|
||||
}
|
||||
sil.SetID(resp.Payload.SilenceID)
|
||||
@@ -485,43 +141,19 @@ func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
|
||||
|
||||
// DelSilence deletes the silence with the sid at the given time.
|
||||
func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
|
||||
for _, am := range amc.ams {
|
||||
for _, am := range amc.Members() {
|
||||
am.DelSilence(at, sil)
|
||||
}
|
||||
}
|
||||
|
||||
// DelSilence deletes the silence with the sid at the given time.
|
||||
func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
|
||||
am.t.Do(at, func() {
|
||||
_, err := am.clientV2.Silence.DeleteSilence(
|
||||
am.T.Do(at, func() {
|
||||
_, err := am.Client().Silence.DeleteSilence(
|
||||
silence.NewDeleteSilenceParams().WithSilenceID(strfmt.UUID(sil.ID())),
|
||||
)
|
||||
if err != nil {
|
||||
am.t.Errorf("Error deleting silence %v: %s", sil, err)
|
||||
am.T.Errorf("Error deleting silence %v: %s", sil, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
|
||||
// does not initiate config reloading.
|
||||
func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
|
||||
for _, am := range amc.ams {
|
||||
am.UpdateConfig(conf)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig rewrites the configuration file for the Alertmanager. It does not
|
||||
// initiate config reloading.
|
||||
func (am *Alertmanager) UpdateConfig(conf string) {
|
||||
if _, err := am.confFile.WriteString(conf); err != nil {
|
||||
am.t.Fatal(err)
|
||||
}
|
||||
if err := am.confFile.Sync(); err != nil {
|
||||
am.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Client returns a client to interact with the API v2 endpoint.
|
||||
func (am *Alertmanager) Client() *apiclient.AlertmanagerAPI {
|
||||
return am.clientV2
|
||||
}
|
||||
|
||||
@@ -14,45 +14,27 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/notify/webhook"
|
||||
"github.com/prometheus/alertmanager/test/testutils"
|
||||
)
|
||||
|
||||
// At is a convenience method to allow for declarative syntax of Acceptance
|
||||
// test definitions.
|
||||
func At(ts float64) float64 {
|
||||
return ts
|
||||
}
|
||||
// Re-export common types and functions from testutils.
|
||||
type (
|
||||
Interval = testutils.Interval
|
||||
TestAlert = testutils.TestAlert
|
||||
MockWebhook = testutils.MockWebhook
|
||||
)
|
||||
|
||||
type Interval struct {
|
||||
start, end float64
|
||||
}
|
||||
|
||||
func (iv Interval) String() string {
|
||||
return fmt.Sprintf("[%v,%v]", iv.start, iv.end)
|
||||
}
|
||||
|
||||
func (iv Interval) contains(f float64) bool {
|
||||
return f >= iv.start && f <= iv.end
|
||||
}
|
||||
|
||||
// Between is a convenience constructor for an interval for declarative syntax
|
||||
// of Acceptance test definitions.
|
||||
func Between(start, end float64) Interval {
|
||||
return Interval{start: start, end: end}
|
||||
}
|
||||
var (
|
||||
At = testutils.At
|
||||
Between = testutils.Between
|
||||
Alert = testutils.Alert
|
||||
NewWebhook = testutils.NewWebhook
|
||||
)
|
||||
|
||||
// TestSilence models a model.Silence with relative times.
|
||||
type TestSilence struct {
|
||||
@@ -125,11 +107,11 @@ func (s *TestSilence) nativeSilence(opts *AcceptanceOpts) *models.Silence {
|
||||
}
|
||||
|
||||
if s.startsAt > 0 {
|
||||
start := strfmt.DateTime(opts.expandTime(s.startsAt))
|
||||
start := strfmt.DateTime(opts.ExpandTime(s.startsAt))
|
||||
nsil.StartsAt = &start
|
||||
}
|
||||
if s.endsAt > 0 {
|
||||
end := strfmt.DateTime(opts.expandTime(s.endsAt))
|
||||
end := strfmt.DateTime(opts.ExpandTime(s.endsAt))
|
||||
nsil.EndsAt = &end
|
||||
}
|
||||
comment := "some comment"
|
||||
@@ -139,194 +121,3 @@ func (s *TestSilence) nativeSilence(opts *AcceptanceOpts) *models.Silence {
|
||||
|
||||
return nsil
|
||||
}
|
||||
|
||||
// TestAlert models a model.Alert with relative times.
|
||||
type TestAlert struct {
|
||||
labels models.LabelSet
|
||||
annotations models.LabelSet
|
||||
startsAt, endsAt float64
|
||||
}
|
||||
|
||||
// Alert creates a new alert declaration with the given key/value pairs
|
||||
// as identifying labels.
|
||||
func Alert(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
a := &TestAlert{
|
||||
labels: models.LabelSet{},
|
||||
annotations: models.LabelSet{},
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.labels[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// nativeAlert converts the declared test alert into a full alert based
|
||||
// on the given parameters.
|
||||
func (a *TestAlert) nativeAlert(opts *AcceptanceOpts) *models.GettableAlert {
|
||||
na := &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: a.labels,
|
||||
},
|
||||
Annotations: a.annotations,
|
||||
StartsAt: &strfmt.DateTime{},
|
||||
EndsAt: &strfmt.DateTime{},
|
||||
}
|
||||
|
||||
if a.startsAt > 0 {
|
||||
start := strfmt.DateTime(opts.expandTime(a.startsAt))
|
||||
na.StartsAt = &start
|
||||
}
|
||||
if a.endsAt > 0 {
|
||||
end := strfmt.DateTime(opts.expandTime(a.endsAt))
|
||||
na.EndsAt = &end
|
||||
}
|
||||
|
||||
return na
|
||||
}
|
||||
|
||||
// Annotate the alert with the given key/value pairs.
|
||||
func (a *TestAlert) Annotate(keyval ...any) *TestAlert {
|
||||
if len(keyval)%2 == 1 {
|
||||
panic("bad key/values")
|
||||
}
|
||||
|
||||
for i := 0; i < len(keyval); i += 2 {
|
||||
ln := keyval[i].(string)
|
||||
lv := keyval[i+1].(string)
|
||||
|
||||
a.annotations[ln] = lv
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// Active declares the relative activity time for this alert. It
|
||||
// must be a single starting value or two values where the second value
|
||||
// declares the resolved time.
|
||||
func (a *TestAlert) Active(tss ...float64) *TestAlert {
|
||||
if len(tss) > 2 || len(tss) == 0 {
|
||||
panic("only one or two timestamps allowed")
|
||||
}
|
||||
if len(tss) == 2 {
|
||||
a.endsAt = tss[1]
|
||||
}
|
||||
a.startsAt = tss[0]
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func equalAlerts(a, b *models.GettableAlert, opts *AcceptanceOpts) bool {
|
||||
if !reflect.DeepEqual(a.Labels, b.Labels) {
|
||||
return false
|
||||
}
|
||||
if !reflect.DeepEqual(a.Annotations, b.Annotations) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !equalTime(time.Time(*a.StartsAt), time.Time(*b.StartsAt), opts) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt == nil) != (b.EndsAt == nil) {
|
||||
return false
|
||||
}
|
||||
if (a.EndsAt != nil) && (b.EndsAt != nil) && !equalTime(time.Time(*a.EndsAt), time.Time(*b.EndsAt), opts) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func equalTime(a, b time.Time, opts *AcceptanceOpts) bool {
|
||||
if a.IsZero() != b.IsZero() {
|
||||
return false
|
||||
}
|
||||
|
||||
diff := a.Sub(b)
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
return diff <= opts.Tolerance
|
||||
}
|
||||
|
||||
type MockWebhook struct {
|
||||
opts *AcceptanceOpts
|
||||
collector *Collector
|
||||
addr string
|
||||
|
||||
// Func is called early on when retrieving a notification by an
|
||||
// Alertmanager. If Func returns true, the given notification is dropped.
|
||||
// See sample usage in `send_test.go/TestRetry()`.
|
||||
Func func(timestamp float64) bool
|
||||
}
|
||||
|
||||
func NewWebhook(t *testing.T, c *Collector) *MockWebhook {
|
||||
t.Helper()
|
||||
|
||||
wh := &MockWebhook{
|
||||
collector: c,
|
||||
opts: c.opts,
|
||||
}
|
||||
|
||||
server := httptest.NewServer(wh)
|
||||
wh.addr = server.Listener.Addr().String()
|
||||
|
||||
t.Cleanup(func() {
|
||||
server.Close()
|
||||
})
|
||||
|
||||
return wh
|
||||
}
|
||||
|
||||
func (ws *MockWebhook) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// Inject Func if it exists.
|
||||
if ws.Func != nil {
|
||||
if ws.Func(ws.opts.relativeTime(time.Now())) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(req.Body)
|
||||
defer req.Body.Close()
|
||||
|
||||
var v webhook.Message
|
||||
if err := dec.Decode(&v); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Transform the webhook message alerts back into model.Alerts.
|
||||
var alerts models.GettableAlerts
|
||||
for _, a := range v.Alerts {
|
||||
var (
|
||||
labels = models.LabelSet{}
|
||||
annotations = models.LabelSet{}
|
||||
)
|
||||
maps.Copy(labels, a.Labels)
|
||||
maps.Copy(annotations, a.Annotations)
|
||||
|
||||
start := strfmt.DateTime(a.StartsAt)
|
||||
end := strfmt.DateTime(a.EndsAt)
|
||||
|
||||
alerts = append(alerts, &models.GettableAlert{
|
||||
Alert: models.Alert{
|
||||
Labels: labels,
|
||||
GeneratorURL: strfmt.URI(a.GeneratorURL),
|
||||
},
|
||||
Annotations: annotations,
|
||||
StartsAt: &start,
|
||||
EndsAt: &end,
|
||||
})
|
||||
}
|
||||
|
||||
ws.collector.add(alerts...)
|
||||
}
|
||||
|
||||
func (ws *MockWebhook) Address() string {
|
||||
return ws.addr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user