1
0
mirror of https://github.com/prometheus/alertmanager.git synced 2026-02-05 06:45:45 +01:00

Add arbitrary key/value data store to the nflog (#4899)

* add arbitrary nflog data storage

Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>

* improve isFirstNotification logic

Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>

* add docstring and ensure input map not modified

Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>

* appease linter

Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>

---------

Signed-off-by: Ethan Hunter <ehunter@hudson-trading.com>
This commit is contained in:
Ethan Hunter
2026-01-28 20:18:56 +00:00
committed by GitHub
parent ec88c5b5b7
commit b5df726c8f
6 changed files with 1038 additions and 67 deletions

View File

@@ -76,6 +76,100 @@ func QGroupKey(gk string) QueryParam {
}
}
// Store abstracts the NFLog's receiver data storage as a mutable key/value store. A store
// can be generated from a nflogpb.Entry and then written via the call to Log.
//
// Every key in the Store is associated with either an int, float, or string value.
type Store struct {
data map[string]*pb.ReceiverDataValue
}
// NewStore creates a Store from the entry's receiver data. If entry is nil, the resulting
// Store is empty.
func NewStore(entry *pb.Entry) *Store {
var receiverData map[string]*pb.ReceiverDataValue
if entry != nil {
receiverData = maps.Clone(entry.ReceiverData)
}
if receiverData == nil {
receiverData = make(map[string]*pb.ReceiverDataValue)
}
return &Store{
data: receiverData,
}
}
// GetInt finds the integer value associated with the key, if any, and returns it.
func (s *Store) GetInt(key string) (int64, bool) {
dataValue, ok := s.data[key]
if !ok {
return 0, false
}
intVal, ok := dataValue.Value.(*pb.ReceiverDataValue_IntVal)
if !ok {
return 0, false
}
return intVal.IntVal, true
}
// GetFloat finds the float value associated with the key, if any, and returns it.
func (s *Store) GetFloat(key string) (float64, bool) {
dataValue, ok := s.data[key]
if !ok {
return 0, false
}
floatVal, ok := dataValue.Value.(*pb.ReceiverDataValue_DoubleVal)
if !ok {
return 0, false
}
return floatVal.DoubleVal, true
}
// GetFloat finds the string value associated with the key, if any, and returns it.
func (s *Store) GetStr(key string) (string, bool) {
dataValue, ok := s.data[key]
if !ok {
return "", false
}
strVal, ok := dataValue.Value.(*pb.ReceiverDataValue_StrVal)
if !ok {
return "", false
}
return strVal.StrVal, true
}
// SetInt associates an integer value with the provided key, overwriting any existing value.
func (s *Store) SetInt(key string, v int64) {
s.data[key] = &pb.ReceiverDataValue{
Value: &pb.ReceiverDataValue_IntVal{
IntVal: v,
},
}
}
// SetFloat associates a float value with the provided key, overwriting any existing value.
func (s *Store) SetFloat(key string, v float64) {
s.data[key] = &pb.ReceiverDataValue{
Value: &pb.ReceiverDataValue_DoubleVal{
DoubleVal: v,
},
}
}
// SetStr associates a string value with the provided key, overwriting any existing value.
func (s *Store) SetStr(key, v string) {
s.data[key] = &pb.ReceiverDataValue{
Value: &pb.ReceiverDataValue_StrVal{
StrVal: v,
},
}
}
// Delete deletes any value associated with the key.
func (s *Store) Delete(key string) {
delete(s.data, key)
}
// Log holds the notification log state for alerts that have been notified.
type Log struct {
clock quartz.Clock
@@ -368,7 +462,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, store *Store, expiry time.Duration) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
@@ -389,6 +483,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
expiresAt = now.Add(expiry)
}
var receiverData map[string]*pb.ReceiverDataValue
if store != nil {
receiverData = store.data
}
e := &pb.MeshEntry{
Entry: &pb.Entry{
Receiver: r,
@@ -396,6 +495,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
Timestamp: now,
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
ReceiverData: receiverData,
},
ExpiresAt: expiresAt,
}

View File

@@ -355,7 +355,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, nil, 0)
require.NoError(t, err, "logging notification failed")
entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))

View File

@@ -4,6 +4,7 @@
package nflogpb
import (
encoding_binary "encoding/binary"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
@@ -92,10 +93,12 @@ type Entry struct {
// FiringAlerts list of hashes of firing alerts at the last notification time.
FiringAlerts []uint64 `protobuf:"varint,6,rep,packed,name=firing_alerts,json=firingAlerts,proto3" json:"firing_alerts,omitempty"`
// ResolvedAlerts list of hashes of resolved alerts at the last notification time.
ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"`
// Data specific to the receiver which sent the notification
ReceiverData map[string]*ReceiverDataValue `protobuf:"bytes,8,rep,name=receiver_data,json=receiverData,proto3" json:"receiver_data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Entry) Reset() { *m = Entry{} }
@@ -177,41 +180,152 @@ func (m *MeshEntry) XXX_DiscardUnknown() {
var xxx_messageInfo_MeshEntry proto.InternalMessageInfo
type ReceiverDataValue struct {
// Types that are valid to be assigned to Value:
//
// *ReceiverDataValue_StrVal
// *ReceiverDataValue_IntVal
// *ReceiverDataValue_DoubleVal
Value isReceiverDataValue_Value `protobuf_oneof:"value"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReceiverDataValue) Reset() { *m = ReceiverDataValue{} }
func (m *ReceiverDataValue) String() string { return proto.CompactTextString(m) }
func (*ReceiverDataValue) ProtoMessage() {}
func (*ReceiverDataValue) Descriptor() ([]byte, []int) {
return fileDescriptor_c2d9785ad9c3e602, []int{3}
}
func (m *ReceiverDataValue) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReceiverDataValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReceiverDataValue.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReceiverDataValue) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReceiverDataValue.Merge(m, src)
}
func (m *ReceiverDataValue) XXX_Size() int {
return m.Size()
}
func (m *ReceiverDataValue) XXX_DiscardUnknown() {
xxx_messageInfo_ReceiverDataValue.DiscardUnknown(m)
}
var xxx_messageInfo_ReceiverDataValue proto.InternalMessageInfo
type isReceiverDataValue_Value interface {
isReceiverDataValue_Value()
MarshalTo([]byte) (int, error)
Size() int
}
type ReceiverDataValue_StrVal struct {
StrVal string `protobuf:"bytes,1,opt,name=str_val,json=strVal,proto3,oneof" json:"str_val,omitempty"`
}
type ReceiverDataValue_IntVal struct {
IntVal int64 `protobuf:"varint,2,opt,name=int_val,json=intVal,proto3,oneof" json:"int_val,omitempty"`
}
type ReceiverDataValue_DoubleVal struct {
DoubleVal float64 `protobuf:"fixed64,3,opt,name=double_val,json=doubleVal,proto3,oneof" json:"double_val,omitempty"`
}
func (*ReceiverDataValue_StrVal) isReceiverDataValue_Value() {}
func (*ReceiverDataValue_IntVal) isReceiverDataValue_Value() {}
func (*ReceiverDataValue_DoubleVal) isReceiverDataValue_Value() {}
func (m *ReceiverDataValue) GetValue() isReceiverDataValue_Value {
if m != nil {
return m.Value
}
return nil
}
func (m *ReceiverDataValue) GetStrVal() string {
if x, ok := m.GetValue().(*ReceiverDataValue_StrVal); ok {
return x.StrVal
}
return ""
}
func (m *ReceiverDataValue) GetIntVal() int64 {
if x, ok := m.GetValue().(*ReceiverDataValue_IntVal); ok {
return x.IntVal
}
return 0
}
func (m *ReceiverDataValue) GetDoubleVal() float64 {
if x, ok := m.GetValue().(*ReceiverDataValue_DoubleVal); ok {
return x.DoubleVal
}
return 0
}
// XXX_OneofWrappers is for the internal use of the proto package.
func (*ReceiverDataValue) XXX_OneofWrappers() []interface{} {
return []interface{}{
(*ReceiverDataValue_StrVal)(nil),
(*ReceiverDataValue_IntVal)(nil),
(*ReceiverDataValue_DoubleVal)(nil),
}
}
func init() {
proto.RegisterType((*Receiver)(nil), "nflogpb.Receiver")
proto.RegisterType((*Entry)(nil), "nflogpb.Entry")
proto.RegisterMapType((map[string]*ReceiverDataValue)(nil), "nflogpb.Entry.ReceiverDataEntry")
proto.RegisterType((*MeshEntry)(nil), "nflogpb.MeshEntry")
proto.RegisterType((*ReceiverDataValue)(nil), "nflogpb.ReceiverDataValue")
}
func init() { proto.RegisterFile("nflog.proto", fileDescriptor_c2d9785ad9c3e602) }
var fileDescriptor_c2d9785ad9c3e602 = []byte{
// 385 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0xbb, 0x4d, 0xd3, 0xda, 0xe3, 0xb4, 0x94, 0x15, 0x07, 0xcb, 0x08, 0xc7, 0x0a, 0x48,
0xf8, 0x82, 0x23, 0x95, 0x27, 0x68, 0x10, 0x12, 0x12, 0x82, 0xc3, 0x8a, 0x2b, 0xb2, 0x36, 0x74,
0xb2, 0x5e, 0x61, 0x7b, 0xad, 0xf5, 0x36, 0x6a, 0xde, 0x82, 0x47, 0xe0, 0x71, 0x72, 0xe4, 0x09,
0xf8, 0x93, 0x27, 0x41, 0xde, 0xb5, 0x1d, 0x8e, 0xdc, 0x66, 0x7f, 0xf3, 0xcd, 0xcc, 0xb7, 0x1f,
0x04, 0xf5, 0xa6, 0x54, 0x22, 0x6b, 0xb4, 0x32, 0x8a, 0x5e, 0xd8, 0x47, 0xb3, 0x8e, 0xe6, 0x42,
0x29, 0x51, 0xe2, 0xd2, 0xe2, 0xf5, 0xfd, 0x66, 0x69, 0x64, 0x85, 0xad, 0xe1, 0x55, 0xe3, 0x94,
0xd1, 0x13, 0xa1, 0x84, 0xb2, 0xe5, 0xb2, 0xab, 0x1c, 0x5d, 0x7c, 0x06, 0x8f, 0xe1, 0x17, 0x94,
0x5b, 0xd4, 0xf4, 0x19, 0x80, 0xd0, 0xea, 0xbe, 0xc9, 0x6b, 0x5e, 0x61, 0x48, 0x12, 0x92, 0xfa,
0xcc, 0xb7, 0xe4, 0x23, 0xaf, 0x90, 0x26, 0x10, 0xc8, 0xda, 0xa0, 0xd0, 0xdc, 0x48, 0x55, 0x87,
0xa7, 0xb6, 0xff, 0x2f, 0xa2, 0xd7, 0x30, 0x91, 0x77, 0x0f, 0xe1, 0x24, 0x21, 0xe9, 0x25, 0xeb,
0xca, 0xc5, 0xf7, 0x53, 0x98, 0xbe, 0xad, 0x8d, 0xde, 0xd1, 0xa7, 0xe0, 0x56, 0xe5, 0x5f, 0x71,
0x67, 0x77, 0xcf, 0x98, 0x67, 0xc1, 0x7b, 0xdc, 0xd1, 0x57, 0xe0, 0xe9, 0xde, 0x85, 0xdd, 0x1b,
0xdc, 0x3c, 0xce, 0xfa, 0x8f, 0x65, 0x83, 0x3d, 0x36, 0x4a, 0x8e, 0x46, 0x0b, 0xde, 0x16, 0xf6,
0xdc, 0xac, 0x37, 0xfa, 0x8e, 0xb7, 0x05, 0x8d, 0xba, 0x6d, 0xad, 0x2a, 0xb7, 0x78, 0x17, 0x9e,
0x25, 0x24, 0xf5, 0xd8, 0xf8, 0xa6, 0x2b, 0xf0, 0xc7, 0x60, 0xc2, 0xa9, 0x3d, 0x15, 0x65, 0x2e,
0xba, 0x6c, 0x88, 0x2e, 0xfb, 0x34, 0x28, 0x56, 0xde, 0xfe, 0xe7, 0xfc, 0xe4, 0xdb, 0xaf, 0x39,
0x61, 0xc7, 0x31, 0xfa, 0x1c, 0x2e, 0x37, 0x52, 0xcb, 0x5a, 0xe4, 0xbc, 0x44, 0x6d, 0xda, 0xf0,
0x3c, 0x99, 0xa4, 0x67, 0x6c, 0xe6, 0xe0, 0xad, 0x65, 0xf4, 0x25, 0x3c, 0x1a, 0x8e, 0x0e, 0xb2,
0x0b, 0x2b, 0xbb, 0x1a, 0xb0, 0x13, 0x2e, 0xb6, 0xe0, 0x7f, 0xc0, 0xb6, 0x70, 0x29, 0xbd, 0x80,
0x29, 0x76, 0x85, 0x4d, 0x28, 0xb8, 0xb9, 0x1a, 0x53, 0xb0, 0x6d, 0xe6, 0x9a, 0xf4, 0x0d, 0x00,
0x3e, 0x34, 0x52, 0x63, 0x9b, 0x73, 0xd3, 0x07, 0xf6, 0x9f, 0xbf, 0xe8, 0xe7, 0x6e, 0xcd, 0xea,
0x7a, 0xff, 0x27, 0x3e, 0xd9, 0x1f, 0x62, 0xf2, 0xe3, 0x10, 0x93, 0xdf, 0x87, 0x98, 0xac, 0xcf,
0xed, 0xe8, 0xeb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x49, 0xcd, 0xa7, 0x1e, 0x61, 0x02, 0x00,
0x00,
// 509 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0x4d, 0x8f, 0xd3, 0x30,
0x10, 0x6d, 0x36, 0xdb, 0x36, 0x99, 0xb6, 0xcb, 0xae, 0xc5, 0x21, 0x04, 0xd1, 0x46, 0x05, 0x89,
0x5e, 0x48, 0x51, 0xb9, 0x20, 0x6e, 0x5b, 0x58, 0xa9, 0x12, 0x82, 0x83, 0x85, 0x7a, 0x41, 0x28,
0x72, 0xa9, 0x9b, 0x5a, 0xa4, 0x71, 0x65, 0xbb, 0xd1, 0xf6, 0x5f, 0xf0, 0xa3, 0x38, 0xf4, 0xc8,
0x2f, 0xe0, 0xa3, 0xbf, 0x04, 0xd9, 0x4e, 0xb2, 0xbb, 0xda, 0x0b, 0xb7, 0xf1, 0x7b, 0x6f, 0x66,
0xde, 0xbc, 0x04, 0x3a, 0xf9, 0x2a, 0xe3, 0x69, 0xbc, 0x15, 0x5c, 0x71, 0xd4, 0x36, 0x8f, 0xed,
0x22, 0x1c, 0xa4, 0x9c, 0xa7, 0x19, 0x1d, 0x1b, 0x78, 0xb1, 0x5b, 0x8d, 0x15, 0xdb, 0x50, 0xa9,
0xc8, 0x66, 0x6b, 0x95, 0xe1, 0xc3, 0x94, 0xa7, 0xdc, 0x94, 0x63, 0x5d, 0x59, 0x74, 0xf8, 0x05,
0x3c, 0x4c, 0xbf, 0x52, 0x56, 0x50, 0x81, 0x9e, 0x00, 0xa4, 0x82, 0xef, 0xb6, 0x49, 0x4e, 0x36,
0x34, 0x70, 0x22, 0x67, 0xe4, 0x63, 0xdf, 0x20, 0x1f, 0xc9, 0x86, 0xa2, 0x08, 0x3a, 0x2c, 0x57,
0x34, 0x15, 0x44, 0x31, 0x9e, 0x07, 0x27, 0x86, 0xbf, 0x0d, 0xa1, 0x73, 0x70, 0xd9, 0xf2, 0x3a,
0x70, 0x23, 0x67, 0xd4, 0xc3, 0xba, 0x1c, 0xfe, 0x70, 0xa1, 0x79, 0x95, 0x2b, 0xb1, 0x47, 0x8f,
0xc1, 0x8e, 0x4a, 0xbe, 0xd1, 0xbd, 0x99, 0xdd, 0xc5, 0x9e, 0x01, 0xde, 0xd3, 0x3d, 0x7a, 0x01,
0x9e, 0x28, 0x5d, 0x98, 0xb9, 0x9d, 0xc9, 0x45, 0x5c, 0x1e, 0x16, 0x57, 0xf6, 0x70, 0x2d, 0xb9,
0x31, 0xba, 0x26, 0x72, 0x6d, 0xd6, 0x75, 0x4b, 0xa3, 0x33, 0x22, 0xd7, 0x28, 0xd4, 0xd3, 0x24,
0xcf, 0x0a, 0xba, 0x0c, 0x4e, 0x23, 0x67, 0xe4, 0xe1, 0xfa, 0x8d, 0xa6, 0xe0, 0xd7, 0xc1, 0x04,
0x4d, 0xb3, 0x2a, 0x8c, 0x6d, 0x74, 0x71, 0x15, 0x5d, 0xfc, 0xa9, 0x52, 0x4c, 0xbd, 0xc3, 0xaf,
0x41, 0xe3, 0xfb, 0xef, 0x81, 0x83, 0x6f, 0xda, 0xd0, 0x53, 0xe8, 0xad, 0x98, 0x60, 0x79, 0x9a,
0x90, 0x8c, 0x0a, 0x25, 0x83, 0x56, 0xe4, 0x8e, 0x4e, 0x71, 0xd7, 0x82, 0x97, 0x06, 0x43, 0xcf,
0xe1, 0x41, 0xb5, 0xb4, 0x92, 0xb5, 0x8d, 0xec, 0xac, 0x82, 0x4b, 0xe1, 0x15, 0xf4, 0xaa, 0xc3,
0x92, 0x25, 0x51, 0x24, 0xf0, 0x22, 0x77, 0xd4, 0x99, 0x44, 0x75, 0x00, 0x26, 0xbf, 0x3a, 0x86,
0x77, 0x44, 0x11, 0x83, 0xe0, 0xae, 0xb8, 0x05, 0x85, 0x9f, 0xe1, 0xe2, 0x9e, 0x44, 0x7f, 0x90,
0x2a, 0x6e, 0x1f, 0xeb, 0x12, 0xbd, 0x84, 0x66, 0x41, 0xb2, 0x1d, 0x2d, 0x63, 0x0e, 0xef, 0xc5,
0xac, 0x9b, 0xe7, 0x5a, 0x81, 0xad, 0xf0, 0xcd, 0xc9, 0x6b, 0x67, 0x58, 0x80, 0xff, 0x81, 0xca,
0xb5, 0x1d, 0xfa, 0x0c, 0x9a, 0x54, 0x17, 0x66, 0x6c, 0x67, 0x72, 0x76, 0xd7, 0x28, 0xb6, 0x24,
0x7a, 0x0b, 0x40, 0xaf, 0xb7, 0x4c, 0x50, 0x99, 0x10, 0x55, 0x6f, 0xfb, 0xaf, 0xa4, 0xcb, 0xbe,
0x4b, 0x35, 0x94, 0x77, 0x8f, 0x32, 0xbe, 0xd0, 0x23, 0x68, 0x4b, 0x25, 0x92, 0x82, 0x64, 0xf6,
0xb0, 0x59, 0x03, 0xb7, 0xa4, 0x12, 0x73, 0x92, 0x69, 0x8a, 0xe5, 0xca, 0x50, 0x7a, 0xa3, 0xab,
0x29, 0x96, 0x2b, 0x4d, 0x0d, 0x00, 0x96, 0x7c, 0xb7, 0xc8, 0xa8, 0x61, 0xf5, 0x3f, 0xe3, 0xcc,
0x1a, 0xd8, 0xb7, 0xd8, 0x9c, 0x64, 0xd3, 0x76, 0x99, 0xcc, 0xf4, 0xfc, 0xf0, 0xb7, 0xdf, 0x38,
0x1c, 0xfb, 0xce, 0xcf, 0x63, 0xdf, 0xf9, 0x73, 0xec, 0x3b, 0x8b, 0x96, 0xf1, 0xfb, 0xea, 0x5f,
0x00, 0x00, 0x00, 0xff, 0xff, 0x42, 0xe0, 0xe0, 0xa3, 0x7a, 0x03, 0x00, 0x00,
}
func (m *Receiver) Marshal() (dAtA []byte, err error) {
@@ -284,48 +398,74 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.ReceiverData) > 0 {
for k := range m.ReceiverData {
v := m.ReceiverData[k]
baseI := i
if v != nil {
{
size, err := v.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintNflog(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
i -= len(k)
copy(dAtA[i:], k)
i = encodeVarintNflog(dAtA, i, uint64(len(k)))
i--
dAtA[i] = 0xa
i = encodeVarintNflog(dAtA, i, uint64(baseI-i))
i--
dAtA[i] = 0x42
}
}
if len(m.ResolvedAlerts) > 0 {
dAtA2 := make([]byte, len(m.ResolvedAlerts)*10)
var j1 int
dAtA3 := make([]byte, len(m.ResolvedAlerts)*10)
var j2 int
for _, num := range m.ResolvedAlerts {
for num >= 1<<7 {
dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80)
dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j1++
j2++
}
dAtA2[j1] = uint8(num)
j1++
dAtA3[j2] = uint8(num)
j2++
}
i -= j1
copy(dAtA[i:], dAtA2[:j1])
i = encodeVarintNflog(dAtA, i, uint64(j1))
i -= j2
copy(dAtA[i:], dAtA3[:j2])
i = encodeVarintNflog(dAtA, i, uint64(j2))
i--
dAtA[i] = 0x3a
}
if len(m.FiringAlerts) > 0 {
dAtA4 := make([]byte, len(m.FiringAlerts)*10)
var j3 int
dAtA5 := make([]byte, len(m.FiringAlerts)*10)
var j4 int
for _, num := range m.FiringAlerts {
for num >= 1<<7 {
dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80)
dAtA5[j4] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j3++
j4++
}
dAtA4[j3] = uint8(num)
j3++
dAtA5[j4] = uint8(num)
j4++
}
i -= j3
copy(dAtA[i:], dAtA4[:j3])
i = encodeVarintNflog(dAtA, i, uint64(j3))
i -= j4
copy(dAtA[i:], dAtA5[:j4])
i = encodeVarintNflog(dAtA, i, uint64(j4))
i--
dAtA[i] = 0x32
}
n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
if err5 != nil {
return 0, err5
n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
if err6 != nil {
return 0, err6
}
i -= n5
i = encodeVarintNflog(dAtA, i, uint64(n5))
i -= n6
i = encodeVarintNflog(dAtA, i, uint64(n6))
i--
dAtA[i] = 0x2a
if m.Resolved {
@@ -391,12 +531,12 @@ func (m *MeshEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
n7, err7 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):])
if err7 != nil {
return 0, err7
n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):])
if err8 != nil {
return 0, err8
}
i -= n7
i = encodeVarintNflog(dAtA, i, uint64(n7))
i -= n8
i = encodeVarintNflog(dAtA, i, uint64(n8))
i--
dAtA[i] = 0x12
if m.Entry != nil {
@@ -414,6 +554,81 @@ func (m *MeshEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *ReceiverDataValue) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReceiverDataValue) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReceiverDataValue) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Value != nil {
{
size := m.Value.Size()
i -= size
if _, err := m.Value.MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
}
}
return len(dAtA) - i, nil
}
func (m *ReceiverDataValue_StrVal) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReceiverDataValue_StrVal) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
i -= len(m.StrVal)
copy(dAtA[i:], m.StrVal)
i = encodeVarintNflog(dAtA, i, uint64(len(m.StrVal)))
i--
dAtA[i] = 0xa
return len(dAtA) - i, nil
}
func (m *ReceiverDataValue_IntVal) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReceiverDataValue_IntVal) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
i = encodeVarintNflog(dAtA, i, uint64(m.IntVal))
i--
dAtA[i] = 0x10
return len(dAtA) - i, nil
}
func (m *ReceiverDataValue_DoubleVal) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReceiverDataValue_DoubleVal) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.DoubleVal))))
i--
dAtA[i] = 0x19
return len(dAtA) - i, nil
}
func encodeVarintNflog(dAtA []byte, offset int, v uint64) int {
offset -= sovNflog(v)
base := offset
@@ -485,6 +700,19 @@ func (m *Entry) Size() (n int) {
}
n += 1 + sovNflog(uint64(l)) + l
}
if len(m.ReceiverData) > 0 {
for k, v := range m.ReceiverData {
_ = k
_ = v
l = 0
if v != nil {
l = v.Size()
l += 1 + sovNflog(uint64(l))
}
mapEntrySize := 1 + len(k) + sovNflog(uint64(len(k))) + l
n += mapEntrySize + 1 + sovNflog(uint64(mapEntrySize))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@@ -509,6 +737,50 @@ func (m *MeshEntry) Size() (n int) {
return n
}
func (m *ReceiverDataValue) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Value != nil {
n += m.Value.Size()
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *ReceiverDataValue_StrVal) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.StrVal)
n += 1 + l + sovNflog(uint64(l))
return n
}
func (m *ReceiverDataValue_IntVal) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
n += 1 + sovNflog(uint64(m.IntVal))
return n
}
func (m *ReceiverDataValue_DoubleVal) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
n += 9
return n
}
func sovNflog(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@@ -987,6 +1259,135 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
} else {
return fmt.Errorf("proto: wrong wireType = %d for field ResolvedAlerts", wireType)
}
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ReceiverData", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthNflog
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthNflog
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.ReceiverData == nil {
m.ReceiverData = make(map[string]*ReceiverDataValue)
}
var mapkey string
var mapvalue *ReceiverDataValue
for iNdEx < postIndex {
entryPreIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
if fieldNum == 1 {
var stringLenmapkey uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapkey |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapkey := int(stringLenmapkey)
if intStringLenmapkey < 0 {
return ErrInvalidLengthNflog
}
postStringIndexmapkey := iNdEx + intStringLenmapkey
if postStringIndexmapkey < 0 {
return ErrInvalidLengthNflog
}
if postStringIndexmapkey > l {
return io.ErrUnexpectedEOF
}
mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
iNdEx = postStringIndexmapkey
} else if fieldNum == 2 {
var mapmsglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
mapmsglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if mapmsglen < 0 {
return ErrInvalidLengthNflog
}
postmsgIndex := iNdEx + mapmsglen
if postmsgIndex < 0 {
return ErrInvalidLengthNflog
}
if postmsgIndex > l {
return io.ErrUnexpectedEOF
}
mapvalue = &ReceiverDataValue{}
if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil {
return err
}
iNdEx = postmsgIndex
} else {
iNdEx = entryPreIndex
skippy, err := skipNflog(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthNflog
}
if (iNdEx + skippy) > postIndex {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
m.ReceiverData[mapkey] = mapvalue
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipNflog(dAtA[iNdEx:])
@@ -1129,6 +1530,120 @@ func (m *MeshEntry) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *ReceiverDataValue) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReceiverDataValue: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReceiverDataValue: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field StrVal", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthNflog
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthNflog
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = &ReceiverDataValue_StrVal{string(dAtA[iNdEx:postIndex])}
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field IntVal", wireType)
}
var v int64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNflog
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Value = &ReceiverDataValue_IntVal{v}
case 3:
if wireType != 1 {
return fmt.Errorf("proto: wrong wireType = %d for field DoubleVal", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Value = &ReceiverDataValue_DoubleVal{float64(math.Float64frombits(v))}
default:
iNdEx = preIndex
skippy, err := skipNflog(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthNflog
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipNflog(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0

View File

@@ -39,6 +39,8 @@ message Entry {
repeated uint64 firing_alerts = 6;
// ResolvedAlerts list of hashes of resolved alerts at the last notification time.
repeated uint64 resolved_alerts = 7;
// Data specific to the receiver which sent the notification
map<string, ReceiverDataValue> receiver_data = 8;
}
// MeshEntry is a wrapper message to communicate a notify log
@@ -50,3 +52,11 @@ message MeshEntry {
// the log entry from its state.
google.protobuf.Timestamp expires_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message ReceiverDataValue {
oneof value {
string str_val = 1;
int64 int_val = 2;
double double_val = 3;
}
}

View File

@@ -142,6 +142,7 @@ const (
keyMuteTimeIntervals
keyActiveTimeIntervals
keyRouteID
keyNflogStore
)
// WithReceiverName populates a context with a receiver name.
@@ -262,6 +263,15 @@ func RouteID(ctx context.Context) (string, bool) {
return v, ok
}
func WithNflogStore(ctx context.Context, store *nflog.Store) context.Context {
return context.WithValue(ctx, keyNflogStore, store)
}
func NflogStore(ctx context.Context) (*nflog.Store, bool) {
v, ok := ctx.Value(keyNflogStore).(*nflog.Store)
return v, ok
}
// A Stage processes alerts under the constraints of the given context.
type Stage interface {
Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
@@ -276,7 +286,7 @@ func (f StageFunc) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Al
}
type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, store *nflog.Store, expiry time.Duration) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}
@@ -780,14 +790,25 @@ func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.
}
var entry *nflogpb.Entry
var isFirstNotification bool
switch len(entries) {
case 0:
isFirstNotification = true
case 1:
entry = entries[0]
// if this condition true, we're sending a notification for a new alert group, but the nflog entry for the previous alert
// group is still in log
isFirstNotification = len(entry.FiringAlerts) == 0 && len(firing) > 0
default:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if isFirstNotification {
ctx = WithNflogStore(ctx, nflog.NewStore(nil))
} else {
ctx = WithNflogStore(ctx, nflog.NewStore(entry))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
span.AddEvent("notify.DedupStage.Exec nflog needs update")
return ctx, alerts, nil
@@ -996,7 +1017,9 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*t
attribute.Int("alerting.alerts.resolved.count", len(resolved)),
)
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
// Extract receiver data from context if present (it's ok for it to be nil).
store, _ := NflogStore(ctx)
return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, store, expiry)
}
type timeStage struct {

View File

@@ -62,15 +62,15 @@ type testNflog struct {
qres []*nflogpb.Entry
qerr error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error
}
func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) {
return l.qres, l.qerr
}
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry)
func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error {
return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, receiverData, expiry)
}
func (l *testNflog) GC() (int, error) {
@@ -632,7 +632,7 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithResolvedAlerts(ctx, []uint64{})
ctx = WithRepeatInterval(ctx, time.Hour)
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{0, 1, 2}, firingAlerts)
@@ -648,7 +648,7 @@ func TestSetNotifiesStage(t *testing.T) {
ctx = WithFiringAlerts(ctx, []uint64{})
ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2})
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error {
require.Equal(t, s.recv, r)
require.Equal(t, "1", gkey)
require.Equal(t, []uint64{}, firingAlerts)
@@ -1070,6 +1070,329 @@ alertmanager_notifications_suppressed_total{reason="active_time_interval"} %d
}
}
func TestReceiverData_PreservationWhenNotifierDoesNotUpdate(t *testing.T) {
var storedData *nflog.Store
callCount := 0
tnflog := &testNflog{
logFunc: func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error {
storedData = receiverData
return nil
},
}
tnflog.qres = []*nflogpb.Entry{}
recv := &nflogpb.Receiver{GroupName: "test"}
dedupStage := NewDedupStage(sendResolved(true), tnflog, recv)
notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
callCount++
if callCount == 1 {
// First call - store some data
if store, ok := NflogStore(ctx); ok {
store.SetStr("threadTs", "1234.5678")
}
return false, nil
}
// Second call - notifier doesn't update ReceiverData
// Does NOT call StoreStr - just returns success
return false, nil
})
integration := NewIntegration(notifier, sendResolved(true), "test", 0, "test-receiver")
retryStage := NewRetryStage(integration, "test", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}))
setNotifiesStage := NewSetNotifiesStage(tnflog, recv)
ctx := context.Background()
ctx = WithGroupKey(ctx, "testkey")
ctx = WithRepeatInterval(ctx, time.Hour)
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "test"},
},
},
}
// First notification
ctx, _, err := dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
_, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
// Verify first notification stored data
require.NotNil(t, storedData)
threadTs, found := storedData.GetStr("threadTs")
require.True(t, found, "threadTs should be in stored data")
require.Equal(t, "1234.5678", threadTs)
firstReceiverData := map[string]*nflogpb.ReceiverDataValue{
"threadTs": {
Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "1234.5678"},
},
}
// Second notification - load previous state
tnflog.qres = []*nflogpb.Entry{
{
Receiver: recv,
GroupKey: []byte("testkey"),
FiringAlerts: []uint64{1},
ResolvedAlerts: []uint64{},
ReceiverData: firstReceiverData,
},
}
ctx = context.Background()
ctx = WithGroupKey(ctx, "testkey")
ctx = WithRepeatInterval(ctx, time.Hour)
ctx, _, err = dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
_, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
if storedData == nil {
t.Error("ReceiverData was lost! Second notification has nil data")
} else {
if threadTs, exists := storedData.GetStr("threadTs"); !exists {
t.Error("ReceiverData 'threadTs' was lost! Second notification doesn't have it")
} else {
t.Logf("threadTs value: %s", threadTs)
}
}
}
func TestDedupStageExtractsReceiverData_DataPresent(t *testing.T) {
receiverData := map[string]*nflogpb.ReceiverDataValue{
"threadTs": {
Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "1234.5678"},
},
"counter": {
Value: &nflogpb.ReceiverDataValue_IntVal{IntVal: 42},
},
}
entry := &nflogpb.Entry{
Receiver: &nflogpb.Receiver{GroupName: "test"},
GroupKey: []byte("key"),
FiringAlerts: []uint64{1, 2, 3},
ReceiverData: receiverData,
}
tnflog := &testNflog{
qres: []*nflogpb.Entry{entry},
}
stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"})
ctx := context.Background()
ctx = WithGroupKey(ctx, "key")
ctx = WithRepeatInterval(ctx, time.Hour)
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "test"},
},
},
}
resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
store, ok := NflogStore(resCtx)
require.True(t, ok, "NflogStore should be in context")
require.NotNil(t, store)
threadTs, found := store.GetStr("threadTs")
require.True(t, found)
require.Equal(t, "1234.5678", threadTs)
counter, found := store.GetInt("counter")
require.True(t, found)
require.Equal(t, int64(42), counter)
}
func TestDedupStageExtractsReceiverData_NilReceiverData(t *testing.T) {
entry := &nflogpb.Entry{
Receiver: &nflogpb.Receiver{GroupName: "test"},
GroupKey: []byte("key"),
FiringAlerts: []uint64{1, 2, 3},
ReceiverData: nil,
}
tnflog := &testNflog{
qres: []*nflogpb.Entry{entry},
}
stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"})
ctx := context.Background()
ctx = WithGroupKey(ctx, "key")
ctx = WithRepeatInterval(ctx, time.Hour)
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "test"},
},
},
}
resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
store, ok := NflogStore(resCtx)
require.True(t, ok, "NflogStore should be in context even when ReceiverData is nil")
require.NotNil(t, store)
}
func TestDedupStageExtractsReceiverData_NoEntry(t *testing.T) {
tnflog := &testNflog{
qres: []*nflogpb.Entry{},
}
stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"})
ctx := context.Background()
ctx = WithGroupKey(ctx, "key")
ctx = WithRepeatInterval(ctx, time.Hour)
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "test"},
},
},
}
resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
store, ok := NflogStore(resCtx)
require.True(t, ok, "NflogStore should be in context even when no entry exists")
require.NotNil(t, store)
}
func TestNflogStore_NoLeakBetweenNotificationSequences(t *testing.T) {
var storedData *nflog.Store
callCount := 0
var capturedStoreValues []map[string]string
tnflog := &testNflog{
logFunc: func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error {
storedData = receiverData
return nil
},
}
recv := &nflogpb.Receiver{GroupName: "test"}
dedupStage := NewDedupStage(sendResolved(true), tnflog, recv)
notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) {
callCount++
store, ok := NflogStore(ctx)
require.True(t, ok, "Store should be available in context")
storeSnapshot := make(map[string]string)
if val, found := store.GetStr("session_data"); found {
storeSnapshot["session_data"] = val
}
capturedStoreValues = append(capturedStoreValues, storeSnapshot)
store.SetStr("session_data", fmt.Sprintf("session_%d", callCount))
return false, nil
})
integration := NewIntegration(notifier, sendResolved(true), "test", 0, "test-receiver")
retryStage := NewRetryStage(integration, "test", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}))
setNotifiesStage := NewSetNotifiesStage(tnflog, recv)
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "test"},
EndsAt: time.Now().Add(time.Hour),
},
},
}
// Scenario 1: First notification ever (no previous nflog entry)
tnflog.qres = []*nflogpb.Entry{}
ctx := context.Background()
ctx = WithGroupKey(ctx, "testkey")
ctx = WithRepeatInterval(ctx, time.Hour)
ctx, _, err := dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
_, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Equal(t, 1, callCount)
require.Empty(t, capturedStoreValues[0], "First notification should see empty Store")
require.NotNil(t, storedData)
sessionData, found := storedData.GetStr("session_data")
require.True(t, found)
require.Equal(t, "session_1", sessionData)
// Scenario 2: Alert resolves, then fires again (new firing sequence)
firstSessionData := map[string]*nflogpb.ReceiverDataValue{
"session_data": {
Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "session_1"},
},
}
tnflog.qres = []*nflogpb.Entry{
{
Receiver: recv,
GroupKey: []byte("testkey"),
FiringAlerts: []uint64{},
ResolvedAlerts: []uint64{1},
ReceiverData: firstSessionData,
},
}
ctx = context.Background()
ctx = WithGroupKey(ctx, "testkey")
ctx = WithRepeatInterval(ctx, time.Hour)
ctx, _, err = dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
_, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Equal(t, 2, callCount)
require.Len(t, capturedStoreValues, 2)
require.Empty(t, capturedStoreValues[1], "New firing sequence should see empty Store (no leak from resolved entry)")
require.NotNil(t, storedData)
sessionData, found = storedData.GetStr("session_data")
require.True(t, found)
require.Equal(t, "session_2", sessionData)
}
func BenchmarkHashAlert(b *testing.B) {
alert := &types.Alert{
Alert: model.Alert{