From b5df726c8fa2b1571d3fb0cb15d6b98d08af7ad3 Mon Sep 17 00:00:00 2001 From: Ethan Hunter Date: Wed, 28 Jan 2026 20:18:56 +0000 Subject: [PATCH] Add arbitrary key/value data store to the nflog (#4899) * add arbitrary nflog data storage Signed-off-by: Ethan Hunter * improve isFirstNotification logic Signed-off-by: Ethan Hunter * add docstring and ensure input map not modified Signed-off-by: Ethan Hunter * appease linter Signed-off-by: Ethan Hunter --------- Signed-off-by: Ethan Hunter --- nflog/nflog.go | 102 +++++- nflog/nflog_test.go | 2 +- nflog/nflogpb/nflog.pb.go | 631 ++++++++++++++++++++++++++++++++++---- nflog/nflogpb/nflog.proto | 10 + notify/notify.go | 27 +- notify/notify_test.go | 333 +++++++++++++++++++- 6 files changed, 1038 insertions(+), 67 deletions(-) diff --git a/nflog/nflog.go b/nflog/nflog.go index 74d07cc1c..3d7176b4c 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -76,6 +76,100 @@ func QGroupKey(gk string) QueryParam { } } +// Store abstracts the NFLog's receiver data storage as a mutable key/value store. A store +// can be generated from a nflogpb.Entry and then written via the call to Log. +// +// Every key in the Store is associated with either an int, float, or string value. +type Store struct { + data map[string]*pb.ReceiverDataValue +} + +// NewStore creates a Store from the entry's receiver data. If entry is nil, the resulting +// Store is empty. +func NewStore(entry *pb.Entry) *Store { + var receiverData map[string]*pb.ReceiverDataValue + if entry != nil { + receiverData = maps.Clone(entry.ReceiverData) + } + if receiverData == nil { + receiverData = make(map[string]*pb.ReceiverDataValue) + } + return &Store{ + data: receiverData, + } +} + +// GetInt finds the integer value associated with the key, if any, and returns it. +func (s *Store) GetInt(key string) (int64, bool) { + dataValue, ok := s.data[key] + if !ok { + return 0, false + } + intVal, ok := dataValue.Value.(*pb.ReceiverDataValue_IntVal) + if !ok { + return 0, false + } + return intVal.IntVal, true +} + +// GetFloat finds the float value associated with the key, if any, and returns it. +func (s *Store) GetFloat(key string) (float64, bool) { + dataValue, ok := s.data[key] + if !ok { + return 0, false + } + floatVal, ok := dataValue.Value.(*pb.ReceiverDataValue_DoubleVal) + if !ok { + return 0, false + } + return floatVal.DoubleVal, true +} + +// GetFloat finds the string value associated with the key, if any, and returns it. +func (s *Store) GetStr(key string) (string, bool) { + dataValue, ok := s.data[key] + if !ok { + return "", false + } + strVal, ok := dataValue.Value.(*pb.ReceiverDataValue_StrVal) + if !ok { + return "", false + } + return strVal.StrVal, true +} + +// SetInt associates an integer value with the provided key, overwriting any existing value. +func (s *Store) SetInt(key string, v int64) { + s.data[key] = &pb.ReceiverDataValue{ + Value: &pb.ReceiverDataValue_IntVal{ + IntVal: v, + }, + } +} + +// SetFloat associates a float value with the provided key, overwriting any existing value. +func (s *Store) SetFloat(key string, v float64) { + s.data[key] = &pb.ReceiverDataValue{ + Value: &pb.ReceiverDataValue_DoubleVal{ + DoubleVal: v, + }, + } +} + +// SetStr associates a string value with the provided key, overwriting any existing value. +func (s *Store) SetStr(key, v string) { + s.data[key] = &pb.ReceiverDataValue{ + Value: &pb.ReceiverDataValue_StrVal{ + StrVal: v, + }, + } +} + +// Delete deletes any value associated with the key. +func (s *Store) Delete(key string) { + delete(s.data, key) +} + // Log holds the notification log state for alerts that have been notified. type Log struct { clock quartz.Clock @@ -368,7 +462,7 @@ func stateKey(k string, r *pb.Receiver) string { return fmt.Sprintf("%s:%s", k, receiverKey(r)) } -func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { +func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, store *Store, expiry time.Duration) error { // Write all st with the same timestamp. now := l.now() key := stateKey(gkey, r) @@ -389,6 +483,11 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui expiresAt = now.Add(expiry) } + var receiverData map[string]*pb.ReceiverDataValue + if store != nil { + receiverData = store.data + } + e := &pb.MeshEntry{ Entry: &pb.Entry{ Receiver: r, @@ -396,6 +495,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui Timestamp: now, FiringAlerts: firingAlerts, ResolvedAlerts: resolvedAlerts, + ReceiverData: receiverData, }, ExpiresAt: expiresAt, } diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index 08a4f3880..e21f0bb4c 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -355,7 +355,7 @@ func TestQuery(t *testing.T) { firingAlerts := []uint64{1, 2, 3} resolvedAlerts := []uint64{4, 5} - err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0) + err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, nil, 0) require.NoError(t, err, "logging notification failed") entries, err := nl.Query(QGroupKey("key"), QReceiver(recv)) diff --git a/nflog/nflogpb/nflog.pb.go b/nflog/nflogpb/nflog.pb.go index a5960171a..5de7db88b 100644 --- a/nflog/nflogpb/nflog.pb.go +++ b/nflog/nflogpb/nflog.pb.go @@ -4,6 +4,7 @@ package nflogpb import ( + encoding_binary "encoding/binary" fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" @@ -92,10 +93,12 @@ type Entry struct { // FiringAlerts list of hashes of firing alerts at the last notification time. FiringAlerts []uint64 `protobuf:"varint,6,rep,packed,name=firing_alerts,json=firingAlerts,proto3" json:"firing_alerts,omitempty"` // ResolvedAlerts list of hashes of resolved alerts at the last notification time. - ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"` + // Data specific to the receiver which sent the notification + ReceiverData map[string]*ReceiverDataValue `protobuf:"bytes,8,rep,name=receiver_data,json=receiverData,proto3" json:"receiver_data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Entry) Reset() { *m = Entry{} } @@ -177,41 +180,152 @@ func (m *MeshEntry) XXX_DiscardUnknown() { var xxx_messageInfo_MeshEntry proto.InternalMessageInfo +type ReceiverDataValue struct { + // Types that are valid to be assigned to Value: + // + // *ReceiverDataValue_StrVal + // *ReceiverDataValue_IntVal + // *ReceiverDataValue_DoubleVal + Value isReceiverDataValue_Value `protobuf_oneof:"value"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReceiverDataValue) Reset() { *m = ReceiverDataValue{} } +func (m *ReceiverDataValue) String() string { return proto.CompactTextString(m) } +func (*ReceiverDataValue) ProtoMessage() {} +func (*ReceiverDataValue) Descriptor() ([]byte, []int) { + return fileDescriptor_c2d9785ad9c3e602, []int{3} +} +func (m *ReceiverDataValue) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReceiverDataValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReceiverDataValue.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReceiverDataValue) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReceiverDataValue.Merge(m, src) +} +func (m *ReceiverDataValue) XXX_Size() int { + return m.Size() +} +func (m *ReceiverDataValue) XXX_DiscardUnknown() { + xxx_messageInfo_ReceiverDataValue.DiscardUnknown(m) +} + +var xxx_messageInfo_ReceiverDataValue proto.InternalMessageInfo + +type isReceiverDataValue_Value interface { + isReceiverDataValue_Value() + MarshalTo([]byte) (int, error) + Size() int +} + +type ReceiverDataValue_StrVal struct { + StrVal string `protobuf:"bytes,1,opt,name=str_val,json=strVal,proto3,oneof" json:"str_val,omitempty"` +} +type ReceiverDataValue_IntVal struct { + IntVal int64 `protobuf:"varint,2,opt,name=int_val,json=intVal,proto3,oneof" json:"int_val,omitempty"` +} +type ReceiverDataValue_DoubleVal struct { + DoubleVal float64 `protobuf:"fixed64,3,opt,name=double_val,json=doubleVal,proto3,oneof" json:"double_val,omitempty"` +} + +func (*ReceiverDataValue_StrVal) isReceiverDataValue_Value() {} +func (*ReceiverDataValue_IntVal) isReceiverDataValue_Value() {} +func (*ReceiverDataValue_DoubleVal) isReceiverDataValue_Value() {} + +func (m *ReceiverDataValue) GetValue() isReceiverDataValue_Value { + if m != nil { + return m.Value + } + return nil +} + +func (m *ReceiverDataValue) GetStrVal() string { + if x, ok := m.GetValue().(*ReceiverDataValue_StrVal); ok { + return x.StrVal + } + return "" +} + +func (m *ReceiverDataValue) GetIntVal() int64 { + if x, ok := m.GetValue().(*ReceiverDataValue_IntVal); ok { + return x.IntVal + } + return 0 +} + +func (m *ReceiverDataValue) GetDoubleVal() float64 { + if x, ok := m.GetValue().(*ReceiverDataValue_DoubleVal); ok { + return x.DoubleVal + } + return 0 +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*ReceiverDataValue) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*ReceiverDataValue_StrVal)(nil), + (*ReceiverDataValue_IntVal)(nil), + (*ReceiverDataValue_DoubleVal)(nil), + } +} + func init() { proto.RegisterType((*Receiver)(nil), "nflogpb.Receiver") proto.RegisterType((*Entry)(nil), "nflogpb.Entry") + proto.RegisterMapType((map[string]*ReceiverDataValue)(nil), "nflogpb.Entry.ReceiverDataEntry") proto.RegisterType((*MeshEntry)(nil), "nflogpb.MeshEntry") + proto.RegisterType((*ReceiverDataValue)(nil), "nflogpb.ReceiverDataValue") } func init() { proto.RegisterFile("nflog.proto", fileDescriptor_c2d9785ad9c3e602) } var fileDescriptor_c2d9785ad9c3e602 = []byte{ - // 385 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xbb, 0x4d, 0xd3, 0xda, 0xe3, 0xb4, 0x94, 0x15, 0x07, 0xcb, 0x08, 0xc7, 0x0a, 0x48, - 0xf8, 0x82, 0x23, 0x95, 0x27, 0x68, 0x10, 0x12, 0x12, 0x82, 0xc3, 0x8a, 0x2b, 0xb2, 0x36, 0x74, - 0xb2, 0x5e, 0x61, 0x7b, 0xad, 0xf5, 0x36, 0x6a, 0xde, 0x82, 0x47, 0xe0, 0x71, 0x72, 0xe4, 0x09, - 0xf8, 0x93, 0x27, 0x41, 0xde, 0xb5, 0x1d, 0x8e, 0xdc, 0x66, 0x7f, 0xf3, 0xcd, 0xcc, 0xb7, 0x1f, - 0x04, 0xf5, 0xa6, 0x54, 0x22, 0x6b, 0xb4, 0x32, 0x8a, 0x5e, 0xd8, 0x47, 0xb3, 0x8e, 0xe6, 0x42, - 0x29, 0x51, 0xe2, 0xd2, 0xe2, 0xf5, 0xfd, 0x66, 0x69, 0x64, 0x85, 0xad, 0xe1, 0x55, 0xe3, 0x94, - 0xd1, 0x13, 0xa1, 0x84, 0xb2, 0xe5, 0xb2, 0xab, 0x1c, 0x5d, 0x7c, 0x06, 0x8f, 0xe1, 0x17, 0x94, - 0x5b, 0xd4, 0xf4, 0x19, 0x80, 0xd0, 0xea, 0xbe, 0xc9, 0x6b, 0x5e, 0x61, 0x48, 0x12, 0x92, 0xfa, - 0xcc, 0xb7, 0xe4, 0x23, 0xaf, 0x90, 0x26, 0x10, 0xc8, 0xda, 0xa0, 0xd0, 0xdc, 0x48, 0x55, 0x87, - 0xa7, 0xb6, 0xff, 0x2f, 0xa2, 0xd7, 0x30, 0x91, 0x77, 0x0f, 0xe1, 0x24, 0x21, 0xe9, 0x25, 0xeb, - 0xca, 0xc5, 0xf7, 0x53, 0x98, 0xbe, 0xad, 0x8d, 0xde, 0xd1, 0xa7, 0xe0, 0x56, 0xe5, 0x5f, 0x71, - 0x67, 0x77, 0xcf, 0x98, 0x67, 0xc1, 0x7b, 0xdc, 0xd1, 0x57, 0xe0, 0xe9, 0xde, 0x85, 0xdd, 0x1b, - 0xdc, 0x3c, 0xce, 0xfa, 0x8f, 0x65, 0x83, 0x3d, 0x36, 0x4a, 0x8e, 0x46, 0x0b, 0xde, 0x16, 0xf6, - 0xdc, 0xac, 0x37, 0xfa, 0x8e, 0xb7, 0x05, 0x8d, 0xba, 0x6d, 0xad, 0x2a, 0xb7, 0x78, 0x17, 0x9e, - 0x25, 0x24, 0xf5, 0xd8, 0xf8, 0xa6, 0x2b, 0xf0, 0xc7, 0x60, 0xc2, 0xa9, 0x3d, 0x15, 0x65, 0x2e, - 0xba, 0x6c, 0x88, 0x2e, 0xfb, 0x34, 0x28, 0x56, 0xde, 0xfe, 0xe7, 0xfc, 0xe4, 0xdb, 0xaf, 0x39, - 0x61, 0xc7, 0x31, 0xfa, 0x1c, 0x2e, 0x37, 0x52, 0xcb, 0x5a, 0xe4, 0xbc, 0x44, 0x6d, 0xda, 0xf0, - 0x3c, 0x99, 0xa4, 0x67, 0x6c, 0xe6, 0xe0, 0xad, 0x65, 0xf4, 0x25, 0x3c, 0x1a, 0x8e, 0x0e, 0xb2, - 0x0b, 0x2b, 0xbb, 0x1a, 0xb0, 0x13, 0x2e, 0xb6, 0xe0, 0x7f, 0xc0, 0xb6, 0x70, 0x29, 0xbd, 0x80, - 0x29, 0x76, 0x85, 0x4d, 0x28, 0xb8, 0xb9, 0x1a, 0x53, 0xb0, 0x6d, 0xe6, 0x9a, 0xf4, 0x0d, 0x00, - 0x3e, 0x34, 0x52, 0x63, 0x9b, 0x73, 0xd3, 0x07, 0xf6, 0x9f, 0xbf, 0xe8, 0xe7, 0x6e, 0xcd, 0xea, - 0x7a, 0xff, 0x27, 0x3e, 0xd9, 0x1f, 0x62, 0xf2, 0xe3, 0x10, 0x93, 0xdf, 0x87, 0x98, 0xac, 0xcf, - 0xed, 0xe8, 0xeb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x49, 0xcd, 0xa7, 0x1e, 0x61, 0x02, 0x00, - 0x00, + // 509 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0x4d, 0x8f, 0xd3, 0x30, + 0x10, 0x6d, 0x36, 0xdb, 0x36, 0x99, 0xb6, 0xcb, 0xae, 0xc5, 0x21, 0x04, 0xd1, 0x46, 0x05, 0x89, + 0x5e, 0x48, 0x51, 0xb9, 0x20, 0x6e, 0x5b, 0x58, 0xa9, 0x12, 0x82, 0x83, 0x85, 0x7a, 0x41, 0x28, + 0x72, 0xa9, 0x9b, 0x5a, 0xa4, 0x71, 0x65, 0xbb, 0xd1, 0xf6, 0x5f, 0xf0, 0xa3, 0x38, 0xf4, 0xc8, + 0x2f, 0xe0, 0xa3, 0xbf, 0x04, 0xd9, 0x4e, 0xb2, 0xbb, 0xda, 0x0b, 0xb7, 0xf1, 0x7b, 0x6f, 0x66, + 0xde, 0xbc, 0x04, 0x3a, 0xf9, 0x2a, 0xe3, 0x69, 0xbc, 0x15, 0x5c, 0x71, 0xd4, 0x36, 0x8f, 0xed, + 0x22, 0x1c, 0xa4, 0x9c, 0xa7, 0x19, 0x1d, 0x1b, 0x78, 0xb1, 0x5b, 0x8d, 0x15, 0xdb, 0x50, 0xa9, + 0xc8, 0x66, 0x6b, 0x95, 0xe1, 0xc3, 0x94, 0xa7, 0xdc, 0x94, 0x63, 0x5d, 0x59, 0x74, 0xf8, 0x05, + 0x3c, 0x4c, 0xbf, 0x52, 0x56, 0x50, 0x81, 0x9e, 0x00, 0xa4, 0x82, 0xef, 0xb6, 0x49, 0x4e, 0x36, + 0x34, 0x70, 0x22, 0x67, 0xe4, 0x63, 0xdf, 0x20, 0x1f, 0xc9, 0x86, 0xa2, 0x08, 0x3a, 0x2c, 0x57, + 0x34, 0x15, 0x44, 0x31, 0x9e, 0x07, 0x27, 0x86, 0xbf, 0x0d, 0xa1, 0x73, 0x70, 0xd9, 0xf2, 0x3a, + 0x70, 0x23, 0x67, 0xd4, 0xc3, 0xba, 0x1c, 0xfe, 0x70, 0xa1, 0x79, 0x95, 0x2b, 0xb1, 0x47, 0x8f, + 0xc1, 0x8e, 0x4a, 0xbe, 0xd1, 0xbd, 0x99, 0xdd, 0xc5, 0x9e, 0x01, 0xde, 0xd3, 0x3d, 0x7a, 0x01, + 0x9e, 0x28, 0x5d, 0x98, 0xb9, 0x9d, 0xc9, 0x45, 0x5c, 0x1e, 0x16, 0x57, 0xf6, 0x70, 0x2d, 0xb9, + 0x31, 0xba, 0x26, 0x72, 0x6d, 0xd6, 0x75, 0x4b, 0xa3, 0x33, 0x22, 0xd7, 0x28, 0xd4, 0xd3, 0x24, + 0xcf, 0x0a, 0xba, 0x0c, 0x4e, 0x23, 0x67, 0xe4, 0xe1, 0xfa, 0x8d, 0xa6, 0xe0, 0xd7, 0xc1, 0x04, + 0x4d, 0xb3, 0x2a, 0x8c, 0x6d, 0x74, 0x71, 0x15, 0x5d, 0xfc, 0xa9, 0x52, 0x4c, 0xbd, 0xc3, 0xaf, + 0x41, 0xe3, 0xfb, 0xef, 0x81, 0x83, 0x6f, 0xda, 0xd0, 0x53, 0xe8, 0xad, 0x98, 0x60, 0x79, 0x9a, + 0x90, 0x8c, 0x0a, 0x25, 0x83, 0x56, 0xe4, 0x8e, 0x4e, 0x71, 0xd7, 0x82, 0x97, 0x06, 0x43, 0xcf, + 0xe1, 0x41, 0xb5, 0xb4, 0x92, 0xb5, 0x8d, 0xec, 0xac, 0x82, 0x4b, 0xe1, 0x15, 0xf4, 0xaa, 0xc3, + 0x92, 0x25, 0x51, 0x24, 0xf0, 0x22, 0x77, 0xd4, 0x99, 0x44, 0x75, 0x00, 0x26, 0xbf, 0x3a, 0x86, + 0x77, 0x44, 0x11, 0x83, 0xe0, 0xae, 0xb8, 0x05, 0x85, 0x9f, 0xe1, 0xe2, 0x9e, 0x44, 0x7f, 0x90, + 0x2a, 0x6e, 0x1f, 0xeb, 0x12, 0xbd, 0x84, 0x66, 0x41, 0xb2, 0x1d, 0x2d, 0x63, 0x0e, 0xef, 0xc5, + 0xac, 0x9b, 0xe7, 0x5a, 0x81, 0xad, 0xf0, 0xcd, 0xc9, 0x6b, 0x67, 0x58, 0x80, 0xff, 0x81, 0xca, + 0xb5, 0x1d, 0xfa, 0x0c, 0x9a, 0x54, 0x17, 0x66, 0x6c, 0x67, 0x72, 0x76, 0xd7, 0x28, 0xb6, 0x24, + 0x7a, 0x0b, 0x40, 0xaf, 0xb7, 0x4c, 0x50, 0x99, 0x10, 0x55, 0x6f, 0xfb, 0xaf, 0xa4, 0xcb, 0xbe, + 0x4b, 0x35, 0x94, 0x77, 0x8f, 0x32, 0xbe, 0xd0, 0x23, 0x68, 0x4b, 0x25, 0x92, 0x82, 0x64, 0xf6, + 0xb0, 0x59, 0x03, 0xb7, 0xa4, 0x12, 0x73, 0x92, 0x69, 0x8a, 0xe5, 0xca, 0x50, 0x7a, 0xa3, 0xab, + 0x29, 0x96, 0x2b, 0x4d, 0x0d, 0x00, 0x96, 0x7c, 0xb7, 0xc8, 0xa8, 0x61, 0xf5, 0x3f, 0xe3, 0xcc, + 0x1a, 0xd8, 0xb7, 0xd8, 0x9c, 0x64, 0xd3, 0x76, 0x99, 0xcc, 0xf4, 0xfc, 0xf0, 0xb7, 0xdf, 0x38, + 0x1c, 0xfb, 0xce, 0xcf, 0x63, 0xdf, 0xf9, 0x73, 0xec, 0x3b, 0x8b, 0x96, 0xf1, 0xfb, 0xea, 0x5f, + 0x00, 0x00, 0x00, 0xff, 0xff, 0x42, 0xe0, 0xe0, 0xa3, 0x7a, 0x03, 0x00, 0x00, } func (m *Receiver) Marshal() (dAtA []byte, err error) { @@ -284,48 +398,74 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.ReceiverData) > 0 { + for k := range m.ReceiverData { + v := m.ReceiverData[k] + baseI := i + if v != nil { + { + size, err := v.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintNflog(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + i -= len(k) + copy(dAtA[i:], k) + i = encodeVarintNflog(dAtA, i, uint64(len(k))) + i-- + dAtA[i] = 0xa + i = encodeVarintNflog(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x42 + } + } if len(m.ResolvedAlerts) > 0 { - dAtA2 := make([]byte, len(m.ResolvedAlerts)*10) - var j1 int + dAtA3 := make([]byte, len(m.ResolvedAlerts)*10) + var j2 int for _, num := range m.ResolvedAlerts { for num >= 1<<7 { - dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j1++ + j2++ } - dAtA2[j1] = uint8(num) - j1++ + dAtA3[j2] = uint8(num) + j2++ } - i -= j1 - copy(dAtA[i:], dAtA2[:j1]) - i = encodeVarintNflog(dAtA, i, uint64(j1)) + i -= j2 + copy(dAtA[i:], dAtA3[:j2]) + i = encodeVarintNflog(dAtA, i, uint64(j2)) i-- dAtA[i] = 0x3a } if len(m.FiringAlerts) > 0 { - dAtA4 := make([]byte, len(m.FiringAlerts)*10) - var j3 int + dAtA5 := make([]byte, len(m.FiringAlerts)*10) + var j4 int for _, num := range m.FiringAlerts { for num >= 1<<7 { - dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) + dAtA5[j4] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j3++ + j4++ } - dAtA4[j3] = uint8(num) - j3++ + dAtA5[j4] = uint8(num) + j4++ } - i -= j3 - copy(dAtA[i:], dAtA4[:j3]) - i = encodeVarintNflog(dAtA, i, uint64(j3)) + i -= j4 + copy(dAtA[i:], dAtA5[:j4]) + i = encodeVarintNflog(dAtA, i, uint64(j4)) i-- dAtA[i] = 0x32 } - n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):]) - if err5 != nil { - return 0, err5 + n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):]) + if err6 != nil { + return 0, err6 } - i -= n5 - i = encodeVarintNflog(dAtA, i, uint64(n5)) + i -= n6 + i = encodeVarintNflog(dAtA, i, uint64(n6)) i-- dAtA[i] = 0x2a if m.Resolved { @@ -391,12 +531,12 @@ func (m *MeshEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - n7, err7 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):]) - if err7 != nil { - return 0, err7 + n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):]) + if err8 != nil { + return 0, err8 } - i -= n7 - i = encodeVarintNflog(dAtA, i, uint64(n7)) + i -= n8 + i = encodeVarintNflog(dAtA, i, uint64(n8)) i-- dAtA[i] = 0x12 if m.Entry != nil { @@ -414,6 +554,81 @@ func (m *MeshEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *ReceiverDataValue) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReceiverDataValue) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReceiverDataValue) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Value != nil { + { + size := m.Value.Size() + i -= size + if _, err := m.Value.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *ReceiverDataValue_StrVal) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReceiverDataValue_StrVal) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.StrVal) + copy(dAtA[i:], m.StrVal) + i = encodeVarintNflog(dAtA, i, uint64(len(m.StrVal))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *ReceiverDataValue_IntVal) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReceiverDataValue_IntVal) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i = encodeVarintNflog(dAtA, i, uint64(m.IntVal)) + i-- + dAtA[i] = 0x10 + return len(dAtA) - i, nil +} +func (m *ReceiverDataValue_DoubleVal) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReceiverDataValue_DoubleVal) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.DoubleVal)))) + i-- + dAtA[i] = 0x19 + return len(dAtA) - i, nil +} func encodeVarintNflog(dAtA []byte, offset int, v uint64) int { offset -= sovNflog(v) base := offset @@ -485,6 +700,19 @@ func (m *Entry) Size() (n int) { } n += 1 + sovNflog(uint64(l)) + l } + if len(m.ReceiverData) > 0 { + for k, v := range m.ReceiverData { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovNflog(uint64(l)) + } + mapEntrySize := 1 + len(k) + sovNflog(uint64(len(k))) + l + n += mapEntrySize + 1 + sovNflog(uint64(mapEntrySize)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -509,6 +737,50 @@ func (m *MeshEntry) Size() (n int) { return n } +func (m *ReceiverDataValue) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Value != nil { + n += m.Value.Size() + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ReceiverDataValue_StrVal) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.StrVal) + n += 1 + l + sovNflog(uint64(l)) + return n +} +func (m *ReceiverDataValue_IntVal) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 1 + sovNflog(uint64(m.IntVal)) + return n +} +func (m *ReceiverDataValue_DoubleVal) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += 9 + return n +} + func sovNflog(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -987,6 +1259,135 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field ResolvedAlerts", wireType) } + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReceiverData", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNflog + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthNflog + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ReceiverData == nil { + m.ReceiverData = make(map[string]*ReceiverDataValue) + } + var mapkey string + var mapvalue *ReceiverDataValue + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthNflog + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthNflog + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthNflog + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthNflog + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &ReceiverDataValue{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipNflog(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthNflog + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.ReceiverData[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipNflog(dAtA[iNdEx:]) @@ -1129,6 +1530,120 @@ func (m *MeshEntry) Unmarshal(dAtA []byte) error { } return nil } +func (m *ReceiverDataValue) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReceiverDataValue: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReceiverDataValue: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StrVal", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthNflog + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthNflog + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = &ReceiverDataValue_StrVal{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IntVal", wireType) + } + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Value = &ReceiverDataValue_IntVal{v} + case 3: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field DoubleVal", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.Value = &ReceiverDataValue_DoubleVal{float64(math.Float64frombits(v))} + default: + iNdEx = preIndex + skippy, err := skipNflog(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthNflog + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipNflog(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/nflog/nflogpb/nflog.proto b/nflog/nflogpb/nflog.proto index eb4fd8ba9..ee7d40544 100644 --- a/nflog/nflogpb/nflog.proto +++ b/nflog/nflogpb/nflog.proto @@ -39,6 +39,8 @@ message Entry { repeated uint64 firing_alerts = 6; // ResolvedAlerts list of hashes of resolved alerts at the last notification time. repeated uint64 resolved_alerts = 7; + // Data specific to the receiver which sent the notification + map receiver_data = 8; } // MeshEntry is a wrapper message to communicate a notify log @@ -50,3 +52,11 @@ message MeshEntry { // the log entry from its state. google.protobuf.Timestamp expires_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; } + +message ReceiverDataValue { + oneof value { + string str_val = 1; + int64 int_val = 2; + double double_val = 3; + } +} diff --git a/notify/notify.go b/notify/notify.go index 03651af57..4de234f93 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -142,6 +142,7 @@ const ( keyMuteTimeIntervals keyActiveTimeIntervals keyRouteID + keyNflogStore ) // WithReceiverName populates a context with a receiver name. @@ -262,6 +263,15 @@ func RouteID(ctx context.Context) (string, bool) { return v, ok } +func WithNflogStore(ctx context.Context, store *nflog.Store) context.Context { + return context.WithValue(ctx, keyNflogStore, store) +} + +func NflogStore(ctx context.Context) (*nflog.Store, bool) { + v, ok := ctx.Value(keyNflogStore).(*nflog.Store) + return v, ok +} + // A Stage processes alerts under the constraints of the given context. type Stage interface { Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) @@ -276,7 +286,7 @@ func (f StageFunc) Exec(ctx context.Context, l *slog.Logger, alerts ...*types.Al } type NotificationLog interface { - Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error + Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, store *nflog.Store, expiry time.Duration) error Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error) } @@ -780,14 +790,25 @@ func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types. } var entry *nflogpb.Entry + var isFirstNotification bool switch len(entries) { case 0: + isFirstNotification = true case 1: entry = entries[0] + // if this condition true, we're sending a notification for a new alert group, but the nflog entry for the previous alert + // group is still in log + isFirstNotification = len(entry.FiringAlerts) == 0 && len(firing) > 0 default: return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) } + if isFirstNotification { + ctx = WithNflogStore(ctx, nflog.NewStore(nil)) + } else { + ctx = WithNflogStore(ctx, nflog.NewStore(entry)) + } + if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { span.AddEvent("notify.DedupStage.Exec nflog needs update") return ctx, alerts, nil @@ -996,7 +1017,9 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*t attribute.Int("alerting.alerts.resolved.count", len(resolved)), ) - return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) + // Extract receiver data from context if present (it's ok for it to be nil). + store, _ := NflogStore(ctx) + return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, store, expiry) } type timeStage struct { diff --git a/notify/notify_test.go b/notify/notify_test.go index e14ab9596..e99a6569f 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -62,15 +62,15 @@ type testNflog struct { qres []*nflogpb.Entry qerr error - logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error + logFunc func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error } func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) { return l.qres, l.qerr } -func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { - return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry) +func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error { + return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, receiverData, expiry) } func (l *testNflog) GC() (int, error) { @@ -632,7 +632,7 @@ func TestSetNotifiesStage(t *testing.T) { ctx = WithResolvedAlerts(ctx, []uint64{}) ctx = WithRepeatInterval(ctx, time.Hour) - tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { + tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error { require.Equal(t, s.recv, r) require.Equal(t, "1", gkey) require.Equal(t, []uint64{0, 1, 2}, firingAlerts) @@ -648,7 +648,7 @@ func TestSetNotifiesStage(t *testing.T) { ctx = WithFiringAlerts(ctx, []uint64{}) ctx = WithResolvedAlerts(ctx, []uint64{0, 1, 2}) - tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { + tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error { require.Equal(t, s.recv, r) require.Equal(t, "1", gkey) require.Equal(t, []uint64{}, firingAlerts) @@ -1070,6 +1070,329 @@ alertmanager_notifications_suppressed_total{reason="active_time_interval"} %d } } +func TestReceiverData_PreservationWhenNotifierDoesNotUpdate(t *testing.T) { + var storedData *nflog.Store + callCount := 0 + + tnflog := &testNflog{ + logFunc: func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error { + storedData = receiverData + return nil + }, + } + + tnflog.qres = []*nflogpb.Entry{} + + recv := &nflogpb.Receiver{GroupName: "test"} + dedupStage := NewDedupStage(sendResolved(true), tnflog, recv) + + notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + callCount++ + + if callCount == 1 { + // First call - store some data + if store, ok := NflogStore(ctx); ok { + store.SetStr("threadTs", "1234.5678") + } + return false, nil + } + // Second call - notifier doesn't update ReceiverData + // Does NOT call StoreStr - just returns success + return false, nil + }) + + integration := NewIntegration(notifier, sendResolved(true), "test", 0, "test-receiver") + retryStage := NewRetryStage(integration, "test", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + setNotifiesStage := NewSetNotifiesStage(tnflog, recv) + + ctx := context.Background() + ctx = WithGroupKey(ctx, "testkey") + ctx = WithRepeatInterval(ctx, time.Hour) + + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "test"}, + }, + }, + } + + // First notification + ctx, _, err := dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + _, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + // Verify first notification stored data + require.NotNil(t, storedData) + threadTs, found := storedData.GetStr("threadTs") + require.True(t, found, "threadTs should be in stored data") + require.Equal(t, "1234.5678", threadTs) + + firstReceiverData := map[string]*nflogpb.ReceiverDataValue{ + "threadTs": { + Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "1234.5678"}, + }, + } + + // Second notification - load previous state + tnflog.qres = []*nflogpb.Entry{ + { + Receiver: recv, + GroupKey: []byte("testkey"), + FiringAlerts: []uint64{1}, + ResolvedAlerts: []uint64{}, + ReceiverData: firstReceiverData, + }, + } + + ctx = context.Background() + ctx = WithGroupKey(ctx, "testkey") + ctx = WithRepeatInterval(ctx, time.Hour) + + ctx, _, err = dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + _, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + if storedData == nil { + t.Error("ReceiverData was lost! Second notification has nil data") + } else { + if threadTs, exists := storedData.GetStr("threadTs"); !exists { + t.Error("ReceiverData 'threadTs' was lost! Second notification doesn't have it") + } else { + t.Logf("threadTs value: %s", threadTs) + } + } +} + +func TestDedupStageExtractsReceiverData_DataPresent(t *testing.T) { + receiverData := map[string]*nflogpb.ReceiverDataValue{ + "threadTs": { + Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "1234.5678"}, + }, + "counter": { + Value: &nflogpb.ReceiverDataValue_IntVal{IntVal: 42}, + }, + } + + entry := &nflogpb.Entry{ + Receiver: &nflogpb.Receiver{GroupName: "test"}, + GroupKey: []byte("key"), + FiringAlerts: []uint64{1, 2, 3}, + ReceiverData: receiverData, + } + + tnflog := &testNflog{ + qres: []*nflogpb.Entry{entry}, + } + + stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"}) + + ctx := context.Background() + ctx = WithGroupKey(ctx, "key") + ctx = WithRepeatInterval(ctx, time.Hour) + + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "test"}, + }, + }, + } + + resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + store, ok := NflogStore(resCtx) + require.True(t, ok, "NflogStore should be in context") + require.NotNil(t, store) + + threadTs, found := store.GetStr("threadTs") + require.True(t, found) + require.Equal(t, "1234.5678", threadTs) + + counter, found := store.GetInt("counter") + require.True(t, found) + require.Equal(t, int64(42), counter) +} + +func TestDedupStageExtractsReceiverData_NilReceiverData(t *testing.T) { + entry := &nflogpb.Entry{ + Receiver: &nflogpb.Receiver{GroupName: "test"}, + GroupKey: []byte("key"), + FiringAlerts: []uint64{1, 2, 3}, + ReceiverData: nil, + } + + tnflog := &testNflog{ + qres: []*nflogpb.Entry{entry}, + } + + stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"}) + + ctx := context.Background() + ctx = WithGroupKey(ctx, "key") + ctx = WithRepeatInterval(ctx, time.Hour) + + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "test"}, + }, + }, + } + + resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + store, ok := NflogStore(resCtx) + require.True(t, ok, "NflogStore should be in context even when ReceiverData is nil") + require.NotNil(t, store) +} + +func TestDedupStageExtractsReceiverData_NoEntry(t *testing.T) { + tnflog := &testNflog{ + qres: []*nflogpb.Entry{}, + } + + stage := NewDedupStage(sendResolved(false), tnflog, &nflogpb.Receiver{GroupName: "test"}) + + ctx := context.Background() + ctx = WithGroupKey(ctx, "key") + ctx = WithRepeatInterval(ctx, time.Hour) + + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "test"}, + }, + }, + } + + resCtx, _, err := stage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + store, ok := NflogStore(resCtx) + require.True(t, ok, "NflogStore should be in context even when no entry exists") + require.NotNil(t, store) +} + +func TestNflogStore_NoLeakBetweenNotificationSequences(t *testing.T) { + var storedData *nflog.Store + callCount := 0 + var capturedStoreValues []map[string]string + + tnflog := &testNflog{ + logFunc: func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, receiverData *nflog.Store, expiry time.Duration) error { + storedData = receiverData + return nil + }, + } + + recv := &nflogpb.Receiver{GroupName: "test"} + dedupStage := NewDedupStage(sendResolved(true), tnflog, recv) + + notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + callCount++ + store, ok := NflogStore(ctx) + require.True(t, ok, "Store should be available in context") + + storeSnapshot := make(map[string]string) + if val, found := store.GetStr("session_data"); found { + storeSnapshot["session_data"] = val + } + capturedStoreValues = append(capturedStoreValues, storeSnapshot) + + store.SetStr("session_data", fmt.Sprintf("session_%d", callCount)) + return false, nil + }) + + integration := NewIntegration(notifier, sendResolved(true), "test", 0, "test-receiver") + retryStage := NewRetryStage(integration, "test", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + setNotifiesStage := NewSetNotifiesStage(tnflog, recv) + + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "test"}, + EndsAt: time.Now().Add(time.Hour), + }, + }, + } + + // Scenario 1: First notification ever (no previous nflog entry) + tnflog.qres = []*nflogpb.Entry{} + + ctx := context.Background() + ctx = WithGroupKey(ctx, "testkey") + ctx = WithRepeatInterval(ctx, time.Hour) + + ctx, _, err := dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + _, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + require.Equal(t, 1, callCount) + require.Empty(t, capturedStoreValues[0], "First notification should see empty Store") + + require.NotNil(t, storedData) + sessionData, found := storedData.GetStr("session_data") + require.True(t, found) + require.Equal(t, "session_1", sessionData) + + // Scenario 2: Alert resolves, then fires again (new firing sequence) + firstSessionData := map[string]*nflogpb.ReceiverDataValue{ + "session_data": { + Value: &nflogpb.ReceiverDataValue_StrVal{StrVal: "session_1"}, + }, + } + + tnflog.qres = []*nflogpb.Entry{ + { + Receiver: recv, + GroupKey: []byte("testkey"), + FiringAlerts: []uint64{}, + ResolvedAlerts: []uint64{1}, + ReceiverData: firstSessionData, + }, + } + + ctx = context.Background() + ctx = WithGroupKey(ctx, "testkey") + ctx = WithRepeatInterval(ctx, time.Hour) + + ctx, _, err = dedupStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + ctx, _, err = retryStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + _, _, err = setNotifiesStage.Exec(ctx, promslog.NewNopLogger(), alerts...) + require.NoError(t, err) + + require.Equal(t, 2, callCount) + require.Len(t, capturedStoreValues, 2) + require.Empty(t, capturedStoreValues[1], "New firing sequence should see empty Store (no leak from resolved entry)") + + require.NotNil(t, storedData) + sessionData, found = storedData.GetStr("session_data") + require.True(t, found) + require.Equal(t, "session_2", sessionData) +} + func BenchmarkHashAlert(b *testing.B) { alert := &types.Alert{ Alert: model.Alert{