1
0
mirror of https://github.com/etcd-io/etcd.git synced 2026-02-05 06:46:49 +01:00

cache: refactor demux to maintain min/max revision range

Signed-off-by: Peter Chang <peter.yaochen.chang@gmail.com>
This commit is contained in:
Peter Chang
2025-09-19 15:05:41 +00:00
parent a86d0496b4
commit 19f41708e2
6 changed files with 352 additions and 55 deletions

39
cache/cache.go vendored
View File

@@ -23,7 +23,6 @@ import (
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -254,16 +253,7 @@ func (c *Cache) watch(rev int64) error {
close(applyErr)
}()
watchCh := c.watcher.Watch(
c.internalCtx,
c.prefix,
clientv3.WithPrefix(),
clientv3.WithRev(rev),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)
err := c.watchEvents(watchCh, applyErr, &readyOnce)
err := c.watchEvents(rev, applyErr, &readyOnce)
c.demux.Unregister(storeW)
if err != nil {
@@ -294,7 +284,15 @@ func (c *Cache) applyStorage(storeW *watcher) error {
}
}
func (c *Cache) watchEvents(watchCh clientv3.WatchChan, applyErr <-chan error, readyOnce *sync.Once) error {
func (c *Cache) watchEvents(rev int64, applyErr <-chan error, readyOnce *sync.Once) error {
watchCh := c.watcher.Watch(
c.internalCtx,
c.prefix,
clientv3.WithPrefix(),
clientv3.WithRev(rev),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)
for {
select {
case <-c.internalCtx.Done():
@@ -303,20 +301,21 @@ func (c *Cache) watchEvents(watchCh clientv3.WatchChan, applyErr <-chan error, r
if !ok {
return nil
}
readyOnce.Do(func() { c.ready.Set() })
readyOnce.Do(func() {
c.demux.Init(rev)
c.ready.Set()
})
if err := resp.Err(); err != nil {
c.ready.Reset()
if errors.Is(err, rpctypes.ErrCompacted) || resp.CompactRevision > 0 {
c.demux.Compact(resp.CompactRevision)
} else {
c.demux.Purge()
}
return err
}
c.demux.Broadcast(resp)
err := c.demux.Broadcast(resp)
if err != nil {
c.ready.Reset()
return err
}
case err := <-applyErr:
c.ready.Reset()
c.demux.Purge()
return err
}
}

29
cache/cache_test.go vendored
View File

@@ -440,7 +440,6 @@ func TestCacheCompactionResync(t *testing.T) {
mw.errorCompacted(10)
waitUntil(t, time.Second, 10*time.Millisecond, func() bool { return !cache.Ready() })
start := time.Now()
ctxGet, cancelGet := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer cancelGet()
@@ -457,24 +456,16 @@ func TestCacheCompactionResync(t *testing.T) {
t.Log("Phase 3: resync after compaction")
mw.triggerCreatedNotify()
if err = cache.WaitReady(t.Context()); err != nil {
t.Fatalf("second WaitReady: %v", err)
}
elapsed := time.Since(start)
if elapsed > time.Second {
t.Fatalf("cache was unready for %v; want: < 1 s", elapsed)
}
expectSnapshotRev := int64(20)
expectedWatchStart := secondSnapshot.Header.Revision + 1
if gotWatchStart := mw.lastStartRev; gotWatchStart != expectedWatchStart {
t.Errorf("Watch started at rev=%d; want %d", gotWatchStart, expectedWatchStart)
ctxResync, cancelResync := context.WithTimeout(t.Context(), time.Second)
defer cancelResync()
if err = cache.WaitForRevision(ctxResync, expectSnapshotRev); err != nil {
t.Fatalf("cache failed to resync to rev=%d within 1s: %v", expectSnapshotRev, err)
}
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
defer cancel()
if err = cache.WaitForRevision(ctx, expectSnapshotRev); err != nil {
t.Fatalf("cache never reached rev=%d: %v", expectSnapshotRev, err)
expectedWatchStart := secondSnapshot.Header.Revision + 1
if gotWatchStart := mw.getLastStartRev(); gotWatchStart != expectedWatchStart {
t.Errorf("Watch started at rev=%d; want %d", gotWatchStart, expectedWatchStart)
}
gotSnapshot, err := cache.Get(t.Context(), "foo", clientv3.WithSerializable())
@@ -560,6 +551,12 @@ func (m *mockWatcher) recordStartRev(rev int64) {
m.lastStartRev = rev
}
func (m *mockWatcher) getLastStartRev() int64 {
m.mu.Lock()
defer m.mu.Unlock()
return m.lastStartRev
}
func (m *mockWatcher) signalRegistration() {
select {
case <-m.registered:

72
cache/demux.go vendored
View File

@@ -27,8 +27,12 @@ type demux struct {
// activeWatchers & laggingWatchers hold the first revision the watcher still needs (nextRev).
activeWatchers map[*watcher]int64
laggingWatchers map[*watcher]int64
history ringBuffer[[]*clientv3.Event]
resyncInterval time.Duration
// Range of revisions maintained for demux operations, inclusive. Broader than history as event revision is not contious.
// maxRev tracks highest seen revision; minRev sets watcher compaction threshold (updated to evictedRev+1 on history overflow)
minRev, maxRev int64
// History stores events within [minRev, maxRev].
history ringBuffer[[]*clientv3.Event]
}
func NewDemux(ctx context.Context, wg *sync.WaitGroup, historyWindowSize int, resyncInterval time.Duration) *demux {
@@ -70,8 +74,7 @@ func (d *demux) Register(w *watcher, startingRev int64) {
d.mu.Lock()
defer d.mu.Unlock()
latestRev := d.history.PeekLatest()
if latestRev == 0 {
if d.maxRev == 0 {
if startingRev == 0 {
d.activeWatchers[w] = 0
} else {
@@ -82,10 +85,10 @@ func (d *demux) Register(w *watcher, startingRev int64) {
// Special case: 0 means “newest”.
if startingRev == 0 {
startingRev = latestRev + 1
startingRev = d.maxRev + 1
}
if startingRev <= latestRev {
if startingRev <= d.maxRev {
d.laggingWatchers[w] = startingRev
} else {
d.activeWatchers[w] = startingRev
@@ -102,16 +105,44 @@ func (d *demux) Unregister(w *watcher) {
w.Stop()
}
func (d *demux) Broadcast(resp clientv3.WatchResponse) {
func (d *demux) Init(minRev int64) {
d.mu.Lock()
defer d.mu.Unlock()
if d.minRev == 0 {
// Watch started for empty demux
d.minRev = minRev
return
}
if d.maxRev == 0 {
// Watch started on initialized demux that never got any event.
d.purge()
d.minRev = minRev
return
}
if minRev == d.maxRev+1 {
// Watch continuing from last revision it observed.
return
}
// Watch opened on revision mismatching dmux last observed revision.
d.purge()
d.minRev = minRev
}
func (d *demux) Broadcast(resp clientv3.WatchResponse) error {
events := resp.Events
if len(events) == 0 {
return
return nil
}
d.mu.Lock()
defer d.mu.Unlock()
err := validateRevisions(events, d.maxRev)
if err != nil {
return err
}
d.updateStoreLocked(events)
d.broadcastLocked(events)
return nil
}
func (d *demux) updateStoreLocked(events []*clientv3.Event) {
@@ -119,14 +150,21 @@ func (d *demux) updateStoreLocked(events []*clientv3.Event) {
for end := 1; end < len(events); end++ {
if events[end].Kv.ModRevision != events[batchStart].Kv.ModRevision {
if end > batchStart {
if end+1 == len(events) && d.history.full() {
d.minRev = d.history.PeekOldest() + 1
}
d.history.Append(events[batchStart:end])
}
batchStart = end
}
}
if batchStart < len(events) {
if d.history.full() {
d.minRev = d.history.PeekOldest() + 1
}
d.history.Append(events[batchStart:])
}
d.maxRev = events[len(events)-1].Kv.ModRevision
}
func (d *demux) broadcastLocked(events []*clientv3.Event) {
@@ -163,6 +201,12 @@ func (d *demux) broadcastLocked(events []*clientv3.Event) {
func (d *demux) Purge() {
d.mu.Lock()
defer d.mu.Unlock()
d.purge()
}
func (d *demux) purge() {
d.maxRev = 0
d.minRev = 0
d.history.RebaseHistory()
for w := range d.activeWatchers {
w.Stop()
@@ -179,27 +223,19 @@ func (d *demux) Purge() {
func (d *demux) Compact(compactRev int64) {
d.mu.Lock()
defer d.mu.Unlock()
d.history.RebaseHistory()
for w, next := range d.activeWatchers {
if next != 0 && next <= compactRev {
delete(d.activeWatchers, w)
d.laggingWatchers[w] = next
}
}
d.purge()
}
func (d *demux) resyncLaggingWatchers() {
d.mu.Lock()
defer d.mu.Unlock()
oldestRev := d.history.PeekOldest()
if oldestRev == 0 {
if d.minRev == 0 {
return
}
for w, nextRev := range d.laggingWatchers {
if nextRev < oldestRev {
if nextRev < d.minRev {
w.Compact(nextRev)
delete(d.laggingWatchers, w)
continue

207
cache/demux_test.go vendored
View File

@@ -24,6 +24,213 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)
func TestInit(t *testing.T) {
type want struct {
min int64
max int64
historyRevs []int64
}
tests := []struct {
name string
capacity int
initRev int64
eventRevs []int64
shouldReinit bool
reinitRev int64
want want
}{
{
name: "first init sets only min",
capacity: 8,
initRev: 5,
eventRevs: nil,
shouldReinit: false,
want: want{min: 5, max: 0, historyRevs: nil},
},
{
name: "init on empty demux with events",
capacity: 8,
initRev: 5,
eventRevs: []int64{7, 9, 13},
shouldReinit: false,
want: want{min: 5, max: 13, historyRevs: []int64{7, 9, 13}},
},
{
name: "continuation at max+1 preserves range and history",
capacity: 8,
initRev: 10,
eventRevs: []int64{13, 15, 21},
shouldReinit: true,
reinitRev: 22,
want: want{min: 10, max: 21, historyRevs: []int64{13, 15, 21}},
},
{
name: "gap from max triggers purge and clears history",
capacity: 8,
initRev: 10,
eventRevs: []int64{13, 15, 21},
shouldReinit: true,
reinitRev: 30,
want: want{min: 30, max: 0, historyRevs: nil},
},
{
name: "idempotent reinit at same revision clears history",
capacity: 8,
initRev: 7,
eventRevs: []int64{8, 9, 10},
shouldReinit: true,
reinitRev: 7,
want: want{min: 7, max: 0, historyRevs: nil},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
d := newDemux(tt.capacity, 10*time.Millisecond)
d.Init(tt.initRev)
if len(tt.eventRevs) > 0 {
if err := d.Broadcast(respWithEventRevs(tt.eventRevs...)); err != nil {
t.Fatalf("Broadcast(%v) failed: %v", tt.eventRevs, err)
}
}
if tt.shouldReinit {
d.Init(tt.reinitRev)
}
if d.minRev != tt.want.min || d.maxRev != tt.want.max {
t.Fatalf("revision range: got(min=%d, max=%d), want(min=%d, max=%d)",
d.minRev, d.maxRev, tt.want.min, tt.want.max)
}
var actualHistoryRevs []int64
d.history.AscendGreaterOrEqual(0, func(rev int64, events []*clientv3.Event) bool {
actualHistoryRevs = append(actualHistoryRevs, rev)
return true
})
if diff := cmp.Diff(tt.want.historyRevs, actualHistoryRevs); diff != "" {
t.Fatalf("history validation failed (-want +got):\n%s", diff)
}
})
}
}
func TestBroadcast(t *testing.T) {
type want struct {
min int64
max int64
shouldError bool
}
tests := []struct {
name string
capacity int
initRev int64
initialRevs []int64
followupRevs []int64
want want
}{
{
name: "history not full",
capacity: 2,
initRev: 1,
initialRevs: []int64{2},
want: want{min: 1, max: 2, shouldError: false},
},
{
name: "history at exact capacity",
capacity: 2,
initRev: 1,
initialRevs: []int64{2, 3},
want: want{min: 1, max: 3, shouldError: false},
},
{
name: "history overflow with eviction",
capacity: 2,
initRev: 1,
initialRevs: []int64{2, 3, 4},
want: want{min: 3, max: 4, shouldError: false},
},
{
name: "history overflow not continuous",
capacity: 2,
initRev: 2,
initialRevs: []int64{4, 8, 16},
want: want{min: 5, max: 16, shouldError: false},
},
{
name: "empty broadcast is no-op",
capacity: 8,
initRev: 10,
initialRevs: []int64{},
want: want{min: 10, max: 0, shouldError: false},
},
{
name: "revisions below maxRev are rejected",
capacity: 8,
initRev: 4,
initialRevs: []int64{5, 6},
followupRevs: []int64{4},
want: want{shouldError: true},
},
{
name: "revisions equal to maxRev are rejected",
capacity: 8,
initRev: 4,
initialRevs: []int64{5, 6},
followupRevs: []int64{6},
want: want{shouldError: true},
},
{
name: "revisions above maxRev are accepted",
capacity: 8,
initRev: 4,
initialRevs: []int64{5, 6},
followupRevs: []int64{9, 14, 17},
want: want{min: 4, max: 17, shouldError: false},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
d := newDemux(tt.capacity, 10*time.Millisecond)
d.Init(tt.initRev)
if len(tt.initialRevs) > 0 {
if err := d.Broadcast(respWithEventRevs(tt.initialRevs...)); err != nil {
t.Fatalf("unexpected error broadcasting initial revisions %v: %v", tt.initialRevs, err)
}
}
if len(tt.followupRevs) > 0 {
err := d.Broadcast(respWithEventRevs(tt.followupRevs...))
if tt.want.shouldError {
if err == nil {
t.Errorf("expected error for revisions %v after maxRev %d; got nil",
tt.followupRevs, tt.initialRevs[len(tt.initialRevs)-1])
}
return
}
if err != nil {
t.Errorf("unexpected error for valid revisions %v: %v", tt.followupRevs, err)
return
}
}
if d.minRev != tt.want.min || d.maxRev != tt.want.max {
t.Fatalf("revision range: got(min=%d, max=%d), want(min=%d, max=%d)",
d.minRev, d.maxRev, tt.want.min, tt.want.max)
}
})
}
}
func TestBroadcastBatching(t *testing.T) {
tests := []struct {
name string

6
cache/ringbuffer.go vendored
View File

@@ -42,7 +42,7 @@ func newRingBuffer[T any](capacity int, revisionOf RevisionOf[T]) *ringBuffer[T]
func (r *ringBuffer[T]) Append(item T) {
entry := entry[T]{revision: r.revisionOf(item), item: item}
if r.size == len(r.buffer) {
if r.full() {
r.tail = (r.tail + 1) % len(r.buffer)
} else {
r.size++
@@ -51,6 +51,10 @@ func (r *ringBuffer[T]) Append(item T) {
r.head = (r.head + 1) % len(r.buffer)
}
func (r *ringBuffer[T]) full() bool {
return r.size == len(r.buffer)
}
// AscendGreaterOrEqual iterates through entries in ascending order starting from the first entry with revision >= pivot.
// TODO: use binary search on the ring buffer to locate the first entry >= nextRev instead of a full scan
func (r *ringBuffer[T]) AscendGreaterOrEqual(pivot int64, iter IterFunc[T]) {

View File

@@ -484,6 +484,60 @@ func TestRebaseHistory(t *testing.T) {
}
}
func TestFull(t *testing.T) {
tests := []struct {
name string
capacity int
numAppends int
expectedFull bool
}{
{
name: "empty_buffer",
capacity: 3,
numAppends: 0,
expectedFull: false,
},
{
name: "partially_filled",
capacity: 5,
numAppends: 3,
expectedFull: false,
},
{
name: "exactly_at_capacity",
capacity: 3,
numAppends: 3,
expectedFull: true,
},
{
name: "beyond_capacity_wrapping",
capacity: 3,
numAppends: 5,
expectedFull: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
rb := newRingBuffer(tt.capacity, func(batch []*clientv3.Event) int64 { return batch[0].Kv.ModRevision })
for i := 1; i <= tt.numAppends; i++ {
batch, err := makeEventBatch(int64(i), "k", 1)
if err != nil {
t.Fatalf("makeEventBatch(%d, k, 1) failed: %v", i, err)
}
rb.Append(batch)
}
if got := rb.full(); got != tt.expectedFull {
t.Fatalf("full()=%t, want=%t (capacity=%d, appends=%d)",
got, tt.expectedFull, tt.capacity, tt.numAppends)
}
})
}
}
func setupRingBuffer(t *testing.T, capacity int, revs []int64) *ringBuffer[[]*clientv3.Event] {
rb := newRingBuffer(capacity, func(batch []*clientv3.Event) int64 { return batch[0].Kv.ModRevision })
for _, r := range revs {