diff --git a/cache/demux.go b/cache/demux.go index c46018ef2..d8c813dd4 100644 --- a/cache/demux.go +++ b/cache/demux.go @@ -140,7 +140,7 @@ func (d *demux) resyncLaggingWatchers() { continue } // TODO: re-enable key‐predicate in Filter when non‐zero startRev or performance tuning is needed - missedEvents := d.history.Filter(AfterRev(nextRev)) + missedEvents := d.history.Filter(nextRev) for _, event := range missedEvents { if !w.enqueueEvent(event) { // buffer overflow: watcher still lagging diff --git a/cache/predicate.go b/cache/predicate.go index 404dfbb01..a9138bfc8 100644 --- a/cache/predicate.go +++ b/cache/predicate.go @@ -14,8 +14,6 @@ package cache -import clientv3 "go.etcd.io/etcd/client/v3" - type Prefix string func (prefix Prefix) Match(key []byte) bool { @@ -25,10 +23,3 @@ func (prefix Prefix) Match(key []byte) bool { prefixLen := len(prefix) return len(key) >= prefixLen && string(key[:prefixLen]) == string(prefix) } - -// AfterRev builds an EntryPredicate that matches events whose ModRevision ≥ rev. -func AfterRev(rev int64) EntryPredicate { - return func(ev *clientv3.Event) bool { - return ev.Kv.ModRevision >= rev - } -} diff --git a/cache/ringbuffer.go b/cache/ringbuffer.go index e6cf55a8e..f682efd9a 100644 --- a/cache/ringbuffer.go +++ b/cache/ringbuffer.go @@ -26,11 +26,7 @@ type ringBuffer struct { head, tail, size int } -// EntryPredicate lets callers decide which entries to keep (e.g. “after revision X”) -type ( - EntryPredicate func(*clientv3.Event) bool - KeyPredicate = func([]byte) bool -) +type KeyPredicate = func([]byte) bool func newRingBuffer(capacity int) *ringBuffer { // assume capacity > 0 – validated by Cache @@ -50,9 +46,9 @@ func (r *ringBuffer) Append(event *clientv3.Event) { r.head = (r.head + 1) % len(r.buffer) } -// Filter returns the events that satisfy every predicate +// Filter returns all events in the buffer whose ModRevision is >= minRev. // TODO: use binary search on the ring buffer to locate the first entry >= nextRev instead of a full scan -func (r *ringBuffer) Filter(entryPred EntryPredicate) (events []*clientv3.Event) { +func (r *ringBuffer) Filter(minRev int64) (events []*clientv3.Event) { events = make([]*clientv3.Event, 0, r.size) for n, i := 0, r.tail; n < r.size; n, i = n+1, (i+1)%len(r.buffer) { @@ -60,7 +56,7 @@ func (r *ringBuffer) Filter(entryPred EntryPredicate) (events []*clientv3.Event) if entry == nil { panic(fmt.Sprintf("ringBuffer.Filter: unexpected nil entry at index %d", i)) } - if entryPred == nil || entryPred(entry) { + if entry.Kv.ModRevision >= minRev { events = append(events, entry) } } diff --git a/cache/ringbuffer_test.go b/cache/ringbuffer_test.go index 8dbab5e89..3bbe58c41 100644 --- a/cache/ringbuffer_test.go +++ b/cache/ringbuffer_test.go @@ -91,7 +91,7 @@ func TestFilter(t *testing.T) { name string capacity int revs []int64 - predicate EntryPredicate + minRev int64 wantFilteredRevs []int64 wantLatestRev int64 }{ @@ -99,7 +99,7 @@ func TestFilter(t *testing.T) { name: "no_filter", capacity: 5, revs: []int64{1, 2, 3}, - predicate: AfterRev(0), + minRev: 0, wantFilteredRevs: []int64{1, 2, 3}, wantLatestRev: 3, }, @@ -107,7 +107,7 @@ func TestFilter(t *testing.T) { name: "partial_match", capacity: 5, revs: []int64{10, 11, 12, 13}, - predicate: AfterRev(12), + minRev: 12, wantFilteredRevs: []int64{12, 13}, wantLatestRev: 13, }, @@ -115,7 +115,7 @@ func TestFilter(t *testing.T) { name: "filter_when_full", capacity: 3, revs: []int64{20, 21, 22, 23, 24}, - predicate: AfterRev(23), + minRev: 23, wantFilteredRevs: []int64{23, 24}, wantLatestRev: 24, }, @@ -123,23 +123,15 @@ func TestFilter(t *testing.T) { name: "none_match", capacity: 4, revs: []int64{30, 31}, - predicate: AfterRev(100), + minRev: 100, wantFilteredRevs: []int64{}, wantLatestRev: 31, }, { - name: "nil_predicate", - capacity: 4, - revs: []int64{1, 2, 3}, - predicate: nil, - wantFilteredRevs: []int64{1, 2, 3}, - wantLatestRev: 3, - }, - { - name: "empty_buffer_nil_predicate", + name: "empty_buffer", capacity: 3, revs: nil, - predicate: nil, + minRev: 0, wantFilteredRevs: []int64{}, wantLatestRev: 0, }, @@ -154,7 +146,7 @@ func TestFilter(t *testing.T) { rb.Append(event(r, "k")) } - gotEvents := rb.Filter(tt.predicate) + gotEvents := rb.Filter(tt.minRev) gotRevs := make([]int64, len(gotEvents)) for i, ev := range gotEvents { gotRevs[i] = ev.Kv.ModRevision @@ -204,7 +196,7 @@ func TestRebaseHistory(t *testing.T) { t.Fatalf("PeekLatest()=%d, want=%d", latestRev, 0) } - events := rb.Filter(nil) + events := rb.Filter(0) if len(events) != 0 { t.Fatalf("Filter() len(events)=%d, want=%d", len(events), 0) }