1
0
mirror of https://github.com/etcd-io/etcd.git synced 2026-02-05 15:46:51 +01:00

Merge pull request #20285 from apullo777/remove-afterRev

cache: remove AfterRev entry predicate
This commit is contained in:
Marek Siarkowicz
2025-07-05 00:21:15 +02:00
committed by GitHub
4 changed files with 14 additions and 35 deletions

2
cache/demux.go vendored
View File

@@ -140,7 +140,7 @@ func (d *demux) resyncLaggingWatchers() {
continue
}
// TODO: re-enable keypredicate in Filter when nonzero startRev or performance tuning is needed
missedEvents := d.history.Filter(AfterRev(nextRev))
missedEvents := d.history.Filter(nextRev)
for _, event := range missedEvents {
if !w.enqueueEvent(event) { // buffer overflow: watcher still lagging

9
cache/predicate.go vendored
View File

@@ -14,8 +14,6 @@
package cache
import clientv3 "go.etcd.io/etcd/client/v3"
type Prefix string
func (prefix Prefix) Match(key []byte) bool {
@@ -25,10 +23,3 @@ func (prefix Prefix) Match(key []byte) bool {
prefixLen := len(prefix)
return len(key) >= prefixLen && string(key[:prefixLen]) == string(prefix)
}
// AfterRev builds an EntryPredicate that matches events whose ModRevision ≥ rev.
func AfterRev(rev int64) EntryPredicate {
return func(ev *clientv3.Event) bool {
return ev.Kv.ModRevision >= rev
}
}

12
cache/ringbuffer.go vendored
View File

@@ -26,11 +26,7 @@ type ringBuffer struct {
head, tail, size int
}
// EntryPredicate lets callers decide which entries to keep (e.g. “after revision X”)
type (
EntryPredicate func(*clientv3.Event) bool
KeyPredicate = func([]byte) bool
)
type KeyPredicate = func([]byte) bool
func newRingBuffer(capacity int) *ringBuffer {
// assume capacity > 0 validated by Cache
@@ -50,9 +46,9 @@ func (r *ringBuffer) Append(event *clientv3.Event) {
r.head = (r.head + 1) % len(r.buffer)
}
// Filter returns the events that satisfy every predicate
// Filter returns all events in the buffer whose ModRevision is >= minRev.
// TODO: use binary search on the ring buffer to locate the first entry >= nextRev instead of a full scan
func (r *ringBuffer) Filter(entryPred EntryPredicate) (events []*clientv3.Event) {
func (r *ringBuffer) Filter(minRev int64) (events []*clientv3.Event) {
events = make([]*clientv3.Event, 0, r.size)
for n, i := 0, r.tail; n < r.size; n, i = n+1, (i+1)%len(r.buffer) {
@@ -60,7 +56,7 @@ func (r *ringBuffer) Filter(entryPred EntryPredicate) (events []*clientv3.Event)
if entry == nil {
panic(fmt.Sprintf("ringBuffer.Filter: unexpected nil entry at index %d", i))
}
if entryPred == nil || entryPred(entry) {
if entry.Kv.ModRevision >= minRev {
events = append(events, entry)
}
}

View File

@@ -91,7 +91,7 @@ func TestFilter(t *testing.T) {
name string
capacity int
revs []int64
predicate EntryPredicate
minRev int64
wantFilteredRevs []int64
wantLatestRev int64
}{
@@ -99,7 +99,7 @@ func TestFilter(t *testing.T) {
name: "no_filter",
capacity: 5,
revs: []int64{1, 2, 3},
predicate: AfterRev(0),
minRev: 0,
wantFilteredRevs: []int64{1, 2, 3},
wantLatestRev: 3,
},
@@ -107,7 +107,7 @@ func TestFilter(t *testing.T) {
name: "partial_match",
capacity: 5,
revs: []int64{10, 11, 12, 13},
predicate: AfterRev(12),
minRev: 12,
wantFilteredRevs: []int64{12, 13},
wantLatestRev: 13,
},
@@ -115,7 +115,7 @@ func TestFilter(t *testing.T) {
name: "filter_when_full",
capacity: 3,
revs: []int64{20, 21, 22, 23, 24},
predicate: AfterRev(23),
minRev: 23,
wantFilteredRevs: []int64{23, 24},
wantLatestRev: 24,
},
@@ -123,23 +123,15 @@ func TestFilter(t *testing.T) {
name: "none_match",
capacity: 4,
revs: []int64{30, 31},
predicate: AfterRev(100),
minRev: 100,
wantFilteredRevs: []int64{},
wantLatestRev: 31,
},
{
name: "nil_predicate",
capacity: 4,
revs: []int64{1, 2, 3},
predicate: nil,
wantFilteredRevs: []int64{1, 2, 3},
wantLatestRev: 3,
},
{
name: "empty_buffer_nil_predicate",
name: "empty_buffer",
capacity: 3,
revs: nil,
predicate: nil,
minRev: 0,
wantFilteredRevs: []int64{},
wantLatestRev: 0,
},
@@ -154,7 +146,7 @@ func TestFilter(t *testing.T) {
rb.Append(event(r, "k"))
}
gotEvents := rb.Filter(tt.predicate)
gotEvents := rb.Filter(tt.minRev)
gotRevs := make([]int64, len(gotEvents))
for i, ev := range gotEvents {
gotRevs[i] = ev.Kv.ModRevision
@@ -204,7 +196,7 @@ func TestRebaseHistory(t *testing.T) {
t.Fatalf("PeekLatest()=%d, want=%d", latestRev, 0)
}
events := rb.Filter(nil)
events := rb.Filter(0)
if len(events) != 0 {
t.Fatalf("Filter() len(events)=%d, want=%d", len(events), 0)
}