mirror of
https://github.com/prometheus/alertmanager.git
synced 2026-02-05 06:45:45 +01:00
Fix silence import: also wait for the error collection goroutine to finish (#4735)
* Fix silence import: also wait for the error collection goroutine to finish As noticed by George Robinson the error collection goroutine in silence import is also not waited for, so we may get an incorrect count when we exit. This adds a done channel for that goroutine, and checks that the error count is correct with a new test. Signed-off-by: Guido Trotter <guido@hudson-trading.com> * bulkImport: use sync.Once to close channels Signed-off-by: Guido Trotter <guido@hudson-trading.com> --------- Signed-off-by: Guido Trotter <guido@hudson-trading.com> Co-authored-by: Guido Trotter <guido@hudson-trading.com>
This commit is contained in:
@@ -99,18 +99,19 @@ func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseConte
|
||||
amclient := NewAlertmanagerClient(alertmanagerURL)
|
||||
silencec := make(chan *models.PostableSilence, 100)
|
||||
errc := make(chan error, 100)
|
||||
errDone := make(chan struct{})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cleanupDone := false
|
||||
var once sync.Once
|
||||
|
||||
closeChannels := func() {
|
||||
if cleanupDone {
|
||||
return
|
||||
}
|
||||
close(silencec)
|
||||
wg.Wait()
|
||||
close(errc)
|
||||
cleanupDone = true
|
||||
once.Do(func() {
|
||||
close(silencec)
|
||||
wg.Wait()
|
||||
close(errc)
|
||||
<-errDone
|
||||
close(errDone)
|
||||
})
|
||||
}
|
||||
defer closeChannels()
|
||||
for w := 0; w < c.workers; w++ {
|
||||
@@ -128,6 +129,7 @@ func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseConte
|
||||
errCount++
|
||||
}
|
||||
}
|
||||
errDone <- struct{}{}
|
||||
}()
|
||||
|
||||
count := 0
|
||||
|
||||
@@ -14,11 +14,13 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
@@ -397,3 +399,175 @@ receivers:
|
||||
require.Error(t, err, "import should fail with invalid JSON")
|
||||
require.Contains(t, string(out), "couldn't unmarshal", "error message should mention JSON parsing")
|
||||
}
|
||||
|
||||
func TestSilenceImportInvalidSilence(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
conf := `
|
||||
route:
|
||||
receiver: "default"
|
||||
group_by: [alertname]
|
||||
group_wait: 1s
|
||||
group_interval: 1s
|
||||
repeat_interval: 1ms
|
||||
|
||||
receivers:
|
||||
- name: "default"
|
||||
webhook_configs:
|
||||
- url: 'http://%s'
|
||||
send_resolved: true
|
||||
`
|
||||
|
||||
at := NewAcceptanceTest(t, &AcceptanceOpts{
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
defer amc.Terminate()
|
||||
|
||||
am := amc.Members()[0]
|
||||
|
||||
// Create file with valid JSON but invalid silence (zero timestamps)
|
||||
tmpDir := t.TempDir()
|
||||
invalidFile := tmpDir + "/invalid_silence.json"
|
||||
invalidSilence := `[
|
||||
{
|
||||
"matchers": [
|
||||
{"name": "alertname", "value": "test", "isRegex": false}
|
||||
],
|
||||
"startsAt": "0001-01-01T00:00:00.000Z",
|
||||
"endsAt": "0001-01-01T00:00:00.000Z",
|
||||
"createdBy": "test",
|
||||
"comment": "invalid silence with zero timestamps"
|
||||
}
|
||||
]`
|
||||
err := os.WriteFile(invalidFile, []byte(invalidSilence), 0o644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to import - should fail with error from addSilenceWorker
|
||||
out, err := am.ImportSilences(invalidFile)
|
||||
require.Error(t, err, "import should fail with invalid silence")
|
||||
require.Contains(t, string(out), "couldn't import 1 out of 1 silences", "error message should report exact count")
|
||||
}
|
||||
|
||||
func TestSilenceImportPartialFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
conf := `
|
||||
route:
|
||||
receiver: "default"
|
||||
group_by: [alertname]
|
||||
group_wait: 1s
|
||||
group_interval: 1s
|
||||
repeat_interval: 1ms
|
||||
|
||||
receivers:
|
||||
- name: "default"
|
||||
webhook_configs:
|
||||
- url: 'http://%s'
|
||||
send_resolved: true
|
||||
`
|
||||
|
||||
at := NewAcceptanceTest(t, &AcceptanceOpts{
|
||||
Tolerance: 1 * time.Second,
|
||||
})
|
||||
co := at.Collector("webhook")
|
||||
wh := NewWebhook(co)
|
||||
|
||||
amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
|
||||
require.NoError(t, amc.Start())
|
||||
defer amc.Terminate()
|
||||
|
||||
am := amc.Members()[0]
|
||||
|
||||
// Create array of PostableSilence directly
|
||||
now := time.Now()
|
||||
future := now.Add(4 * time.Hour)
|
||||
silences := []models.PostableSilence{
|
||||
// Valid silence 1
|
||||
{
|
||||
Silence: models.Silence{
|
||||
Matchers: models.Matchers{
|
||||
&models.Matcher{Name: ptrString("alertname"), Value: ptrString("test1"), IsRegex: ptrBool(false)},
|
||||
},
|
||||
StartsAt: ptrTime(now),
|
||||
EndsAt: ptrTime(future),
|
||||
CreatedBy: ptrString("test"),
|
||||
Comment: ptrString("valid silence 1"),
|
||||
},
|
||||
},
|
||||
// Invalid silence 2 (endsAt before startsAt)
|
||||
{
|
||||
Silence: models.Silence{
|
||||
Matchers: models.Matchers{
|
||||
&models.Matcher{Name: ptrString("alertname"), Value: ptrString("test2"), IsRegex: ptrBool(false)},
|
||||
},
|
||||
StartsAt: ptrTime(future), // Swapped!
|
||||
EndsAt: ptrTime(now), // Swapped!
|
||||
CreatedBy: ptrString("test"),
|
||||
Comment: ptrString("invalid silence 2"),
|
||||
},
|
||||
},
|
||||
// Valid silence 3
|
||||
{
|
||||
Silence: models.Silence{
|
||||
Matchers: models.Matchers{
|
||||
&models.Matcher{Name: ptrString("alertname"), Value: ptrString("test3"), IsRegex: ptrBool(false)},
|
||||
},
|
||||
StartsAt: ptrTime(now),
|
||||
EndsAt: ptrTime(future),
|
||||
CreatedBy: ptrString("test"),
|
||||
Comment: ptrString("valid silence 3"),
|
||||
},
|
||||
},
|
||||
// Invalid silence 4 (endsAt before startsAt)
|
||||
{
|
||||
Silence: models.Silence{
|
||||
Matchers: models.Matchers{
|
||||
&models.Matcher{Name: ptrString("alertname"), Value: ptrString("test4"), IsRegex: ptrBool(false)},
|
||||
},
|
||||
StartsAt: ptrTime(future), // Swapped!
|
||||
EndsAt: ptrTime(now), // Swapped!
|
||||
CreatedBy: ptrString("test"),
|
||||
Comment: ptrString("invalid silence 4"),
|
||||
},
|
||||
},
|
||||
// Valid silence 5
|
||||
{
|
||||
Silence: models.Silence{
|
||||
Matchers: models.Matchers{
|
||||
&models.Matcher{Name: ptrString("alertname"), Value: ptrString("test5"), IsRegex: ptrBool(false)},
|
||||
},
|
||||
StartsAt: ptrTime(now),
|
||||
EndsAt: ptrTime(future),
|
||||
CreatedBy: ptrString("test"),
|
||||
Comment: ptrString("valid silence 5"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Serialize to JSON
|
||||
jsonData, err := json.Marshal(silences)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write to file
|
||||
tmpDir := t.TempDir()
|
||||
mixedFile := tmpDir + "/mixed_silences.json"
|
||||
err = os.WriteFile(mixedFile, jsonData, 0o644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to import - should partially succeed
|
||||
out, err := am.ImportSilences(mixedFile)
|
||||
require.Error(t, err, "import should fail with partial import")
|
||||
require.Contains(t, string(out), "couldn't import 2 out of 5 silences", "error message should report 2 failures out of 5")
|
||||
}
|
||||
|
||||
func ptrString(s string) *string { return &s }
|
||||
func ptrBool(b bool) *bool { return &b }
|
||||
func ptrTime(t time.Time) *strfmt.DateTime {
|
||||
st := strfmt.DateTime(t)
|
||||
return &st
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user