mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 06:45:45 +01:00
Fix alertmanager test port allocation race conditions (#4768)
At the moment we try to allocate ports in the tests, the close them, and then start alertmanager on those ports. This is very brittle and often fails. Fix the race conditions by directly starting alertmanager on system-allocated free ports (using :0 in the address) and then detecting the ports used, and using those in the test. Signed-off-by: Guido Trotter <guido@hudson-trading.com> Co-authored-by: Guido Trotter <guido@hudson-trading.com>
This commit is contained in:
@@ -357,7 +357,7 @@ func (am *Alertmanager) testRouteCommand(labels ...string) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (am *Alertmanager) getURL(path string) string {
|
||||
return fmt.Sprintf("http://%s%s%s", am.APIAddr, am.Opts.RoutePrefix, path)
|
||||
return fmt.Sprintf("http://%s%s%s", am.APIAddr(), am.Opts.RoutePrefix, path)
|
||||
}
|
||||
|
||||
// Version runs the 'amtool' command with the --version option and checks
|
||||
|
||||
@@ -17,10 +17,10 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
@@ -68,23 +68,6 @@ func (opts *AcceptanceOpts) SetBaseTime(t time.Time) {
|
||||
opts.baseTime = t
|
||||
}
|
||||
|
||||
// FreeAddress returns a new listen address not currently in use.
|
||||
func FreeAddress() string {
|
||||
// Let the OS allocate a free address, close it and hope
|
||||
// it is still free when starting Alertmanager.
|
||||
l, err := net.Listen("tcp4", "localhost:0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := l.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return l.Addr().String()
|
||||
}
|
||||
|
||||
// AcceptanceTest provides declarative definition of given inputs and expected
|
||||
// output of an Alertmanager setup.
|
||||
type AcceptanceTest struct {
|
||||
@@ -138,11 +121,8 @@ func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *Alertmanage
|
||||
am.confFile = cf
|
||||
am.UpdateConfig(conf)
|
||||
|
||||
am.APIAddr = FreeAddress()
|
||||
am.ClusterAddr = FreeAddress()
|
||||
|
||||
transport := httptransport.New(am.APIAddr, t.opts.RoutePrefix+"/api/v2/", nil)
|
||||
am.clientV2 = apiclient.New(transport, strfmt.Default)
|
||||
// apiAddr and clusterAddr will be discovered during Start()
|
||||
// clientV2 will be created during Start() with the discovered address
|
||||
|
||||
amc.ams = append(amc.ams, am)
|
||||
}
|
||||
@@ -244,8 +224,8 @@ type Alertmanager struct {
|
||||
T *AcceptanceTest
|
||||
Opts *AcceptanceOpts
|
||||
|
||||
APIAddr string
|
||||
ClusterAddr string
|
||||
apiAddr string // the API address of this instance, discovered after start
|
||||
clusterAddr string // the cluster address can be the address of any peer
|
||||
|
||||
clientV2 *apiclient.AlertmanagerAPI
|
||||
confFile *os.File
|
||||
@@ -255,6 +235,16 @@ type Alertmanager struct {
|
||||
errc chan<- error
|
||||
}
|
||||
|
||||
// ClusterAddr returns an address for the cluster.
|
||||
func (am *Alertmanager) ClusterAddr() string {
|
||||
return am.clusterAddr
|
||||
}
|
||||
|
||||
// APIAddr returns the API address for the instance.
|
||||
func (am *Alertmanager) APIAddr() string {
|
||||
return am.apiAddr
|
||||
}
|
||||
|
||||
// AlertmanagerCluster represents a group of Alertmanager instances
|
||||
// acting as a cluster.
|
||||
type AlertmanagerCluster struct {
|
||||
@@ -263,22 +253,30 @@ type AlertmanagerCluster struct {
|
||||
|
||||
// Start the Alertmanager cluster and wait until it is ready to receive.
|
||||
func (amc *AlertmanagerCluster) Start(additionalArgs ...string) error {
|
||||
args := additionalArgs
|
||||
for _, am := range amc.ams {
|
||||
args = append(args, "--cluster.peer="+am.ClusterAddr)
|
||||
}
|
||||
args := make([]string, 0, len(additionalArgs)+1)
|
||||
args = append(args, additionalArgs...)
|
||||
clusterAdded := false
|
||||
|
||||
for _, am := range amc.ams {
|
||||
err := am.Start(args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start alertmanager cluster: %w", err)
|
||||
for i, am := range amc.ams {
|
||||
am.T.Logf("Starting cluster member %d/%d", i+1, len(amc.ams))
|
||||
|
||||
// Start this instance (it will discover its own ports)
|
||||
if err := am.Start(args); err != nil {
|
||||
return fmt.Errorf("starting cluster member %d: %w", i, err)
|
||||
}
|
||||
|
||||
// From the second instance onwards, append the cluster.peer argument
|
||||
// so the subsequent ones join up.
|
||||
if !clusterAdded {
|
||||
args = append(args, "--cluster.peer="+am.ClusterAddr())
|
||||
clusterAdded = true
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for cluster to converge
|
||||
for _, am := range amc.ams {
|
||||
err := am.WaitForCluster(len(amc.ams))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %w", am.ClusterAddr, err)
|
||||
if err := am.WaitForCluster(len(amc.ams)); err != nil {
|
||||
return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %w", am.APIAddr(), err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,15 +288,81 @@ func (amc *AlertmanagerCluster) Members() []*Alertmanager {
|
||||
return amc.ams
|
||||
}
|
||||
|
||||
// discoverWebAddress parses stderr for "Listening on" log message and updates am.apiAddr.
|
||||
func (am *Alertmanager) discoverWebAddress(timeout time.Duration) error {
|
||||
am.T.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
stderrBuf, ok := am.cmd.Stderr.(*buffer)
|
||||
if !ok {
|
||||
return fmt.Errorf("stderr is not a buffer")
|
||||
}
|
||||
|
||||
// Compile regex once outside the loop
|
||||
re := regexp.MustCompile(`address=([^\s]+)`)
|
||||
lastPos := 0
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
stderr := stderrBuf.String()
|
||||
// Only process new content since last check
|
||||
if len(stderr) <= lastPos {
|
||||
continue
|
||||
}
|
||||
newContent := stderr[lastPos:]
|
||||
lastPos = len(stderr)
|
||||
|
||||
// Look for: msg="Listening on" address=127.0.0.1:PORT
|
||||
for line := range strings.SplitSeq(newContent, "\n") {
|
||||
if !strings.Contains(line, "Listening on") {
|
||||
continue
|
||||
}
|
||||
// Extract address using regex: address=IP:PORT
|
||||
matches := re.FindStringSubmatch(line)
|
||||
if len(matches) == 2 {
|
||||
am.apiAddr = matches[1]
|
||||
am.T.Logf("Discovered web address: %s", am.apiAddr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for web address in logs")
|
||||
}
|
||||
|
||||
// discoverClusterAddress queries /api/v2/status for cluster address and updates am.clusterAddr.
|
||||
func (am *Alertmanager) discoverClusterAddress(timeout time.Duration) error {
|
||||
am.T.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
params := general.NewGetStatusParams()
|
||||
params.WithContext(context.Background())
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
status, err := am.clientV2.General.GetStatus(params)
|
||||
if err != nil || status.Payload == nil || status.Payload.Cluster == nil {
|
||||
continue
|
||||
}
|
||||
if len(status.Payload.Cluster.Peers) == 0 {
|
||||
continue
|
||||
}
|
||||
peer := status.Payload.Cluster.Peers[0]
|
||||
if peer != nil && peer.Address != nil {
|
||||
am.clusterAddr = *peer.Address
|
||||
am.T.Logf("Discovered cluster address: %s", am.clusterAddr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for cluster address from API")
|
||||
}
|
||||
|
||||
// Start the alertmanager and wait until it is ready to receive.
|
||||
func (am *Alertmanager) Start(additionalArg []string) error {
|
||||
am.T.Helper()
|
||||
args := []string{
|
||||
"--config.file", am.confFile.Name(),
|
||||
"--log.level", "debug",
|
||||
"--web.listen-address", am.APIAddr,
|
||||
"--web.listen-address", "127.0.0.1:0",
|
||||
"--storage.path", am.dir,
|
||||
"--cluster.listen-address", am.ClusterAddr,
|
||||
"--cluster.listen-address", "127.0.0.1:0",
|
||||
"--cluster.settle-timeout", "0s",
|
||||
}
|
||||
if len(am.Opts.FeatureFlags) > 0 {
|
||||
@@ -331,16 +395,22 @@ func (am *Alertmanager) Start(additionalArg []string) error {
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
var lastErr error
|
||||
for range 10 {
|
||||
_, lastErr = am.clientV2.General.GetStatus(nil)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
// Discover web address from logs
|
||||
if err := am.discoverWebAddress(5 * time.Second); err != nil {
|
||||
return fmt.Errorf("failed to discover web address: %w", err)
|
||||
}
|
||||
return fmt.Errorf("unable to get a successful response from the Alertmanager: %w", lastErr)
|
||||
|
||||
// Update API client with discovered address
|
||||
transport := httptransport.New(am.apiAddr, am.Opts.RoutePrefix+"/api/v2/", nil)
|
||||
am.clientV2 = apiclient.New(transport, strfmt.Default)
|
||||
|
||||
// Discover cluster address from API (also serves as readiness check)
|
||||
if err := am.discoverClusterAddress(5 * time.Second); err != nil {
|
||||
return fmt.Errorf("failed to discover cluster address: %w", err)
|
||||
}
|
||||
|
||||
am.T.Logf("Alertmanager started - web: %s, cluster: %s", am.apiAddr, am.clusterAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForCluster waits for the Alertmanager instance to join a cluster with the
|
||||
@@ -437,5 +507,8 @@ func (am *Alertmanager) UpdateConfig(conf string) {
|
||||
|
||||
// Client returns a client to interact with the API v2 endpoint.
|
||||
func (am *Alertmanager) Client() *apiclient.AlertmanagerAPI {
|
||||
if am.clientV2 == nil {
|
||||
panic("Client not available. Start() was not called or failed.")
|
||||
}
|
||||
return am.clientV2
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user