1
0
mirror of https://github.com/etcd-io/etcd.git synced 2026-02-05 06:46:49 +01:00

enable range/prefix/fromKey key filtering

Signed-off-by: Peter Chang <peter.yaochen.chang@gmail.com>
This commit is contained in:
Peter Chang
2025-07-10 14:33:47 +00:00
parent f84fb66cbe
commit 792f763e83
4 changed files with 493 additions and 79 deletions

70
cache/cache.go vendored
View File

@@ -15,10 +15,10 @@
package cache
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
@@ -105,15 +105,15 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
op := clientv3.OpGet(key, opts...)
startRev := op.Rev()
// TODO: Support watch on subprefix and single key & arbitrary startRev support once we guarantee gap-free replay.
if key != c.prefix || !clientv3.IsOptsWithPrefix(opts) || startRev != 0 {
responseChan := make(chan clientv3.WatchResponse, 1)
responseChan <- clientv3.WatchResponse{Canceled: true}
close(responseChan)
return responseChan
pred, err := c.validateWatch(key, opts...)
if err != nil {
ch := make(chan clientv3.WatchResponse, 1)
ch <- clientv3.WatchResponse{Canceled: true}
close(ch)
return ch
}
w := newWatcher(c.cfg.PerWatcherBufferSize, func(k []byte) bool { return strings.HasPrefix(string(k), key) })
w := newWatcher(c.cfg.PerWatcherBufferSize, pred)
c.demux.Register(w, startRev)
responseChan := make(chan clientv3.WatchResponse)
@@ -226,3 +226,57 @@ func readWatchChannel(
}
return nil
}
func (c *Cache) validateWatch(key string, opts ...clientv3.OpOption) (pred KeyPredicate, err error) {
op := clientv3.OpGet(key, opts...)
startRev := op.Rev()
// TODO: Support watch on arbitrary startRev support once we guarantee gap-free replay.
if startRev != 0 {
return nil, ErrUnsupportedWatch
}
startKey := []byte(key)
endKey := op.RangeBytes() // nil = single key, {0}=FromKey, else explicit range
if err := c.validateWatchRange(startKey, endKey); err != nil {
return nil, err
}
return KeyPredForRange(startKey, endKey), nil
}
func (c *Cache) validateWatchRange(startKey, endKey []byte) error {
prefixStart := []byte(c.prefix)
prefixEnd := []byte(clientv3.GetPrefixRangeEnd(c.prefix))
isSingleKey := len(endKey) == 0
isFromKey := len(endKey) == 1 && endKey[0] == 0
switch {
case isSingleKey:
if c.prefix == "" {
return nil
}
if bytes.Compare(startKey, prefixStart) < 0 || bytes.Compare(startKey, prefixEnd) >= 0 {
return ErrUnsupportedWatch
}
return nil
case isFromKey:
if c.prefix != "" {
return ErrUnsupportedWatch
}
return nil
default:
if bytes.Compare(endKey, startKey) <= 0 {
return ErrUnsupportedWatch
}
if c.prefix == "" {
return nil
}
if bytes.Compare(startKey, prefixStart) < 0 || bytes.Compare(endKey, prefixEnd) > 0 {
return ErrUnsupportedWatch
}
return nil
}
}

190
cache/cache_test.go vendored Normal file
View File

@@ -0,0 +1,190 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"testing"
clientv3 "go.etcd.io/etcd/client/v3"
)
func TestValidateWatchRange(t *testing.T) {
type tc struct {
name string
watchKey string
opts []clientv3.OpOption
cachePrefix string
wantErr bool
}
tests := []tc{
{
name: "single key",
watchKey: "/a",
cachePrefix: "",
wantErr: false,
},
{
name: "prefix single key",
watchKey: "/foo/a",
cachePrefix: "/foo",
wantErr: false,
},
{
name: "single key outside prefix returns error",
watchKey: "/z",
cachePrefix: "/foo",
wantErr: true,
},
{
name: "explicit range",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithRange("/b")},
cachePrefix: "",
wantErr: false,
},
{
name: "exact prefix range",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithRange("/b")},
cachePrefix: "/a",
wantErr: false,
},
{
name: "prefix subrange",
watchKey: "/foo",
opts: []clientv3.OpOption{clientv3.WithRange("/foo/a")},
cachePrefix: "/foo",
wantErr: false,
},
{
name: "reverse range returns error",
watchKey: "/b",
opts: []clientv3.OpOption{clientv3.WithRange("/a")},
cachePrefix: "",
wantErr: true,
},
{
name: "empty range returns error",
watchKey: "/foo",
opts: []clientv3.OpOption{clientv3.WithRange("/foo")},
cachePrefix: "",
wantErr: true,
},
{
name: "range starting below cache prefix returns error",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithRange("/foo")},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "range encompassing cache prefix returns error",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithRange("/z")},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "range crossing prefixEnd returns error",
watchKey: "/foo",
opts: []clientv3.OpOption{clientv3.WithRange("/z")},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "empty prefix",
watchKey: "",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "",
wantErr: false,
},
{
name: "empty prefix with cachePrefix returns error",
watchKey: "",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "prefix watch matches cachePrefix exactly",
watchKey: "/foo",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "/foo",
wantErr: false,
},
{
name: "prefix watch inside cachePrefix",
watchKey: "/foo/bar",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "/foo",
wantErr: false,
},
{
name: "prefix starting below cachePrefix returns error",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "prefix starting above shard prefixEnd returns error",
watchKey: "/fop",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "fromKey openended",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
cachePrefix: "",
wantErr: false,
},
{
name: "fromKey starting at prefix start",
watchKey: "/foo",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "fromKey starting below prefixEnd",
watchKey: "/a",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
cachePrefix: "/foo",
wantErr: true,
},
{
name: "fromKey starting above prefixEnd returns error",
watchKey: "/fop",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
cachePrefix: "/foo",
wantErr: true,
},
}
for _, c := range tests {
t.Run(c.name, func(t *testing.T) {
dummyCache := &Cache{prefix: c.cachePrefix}
op := clientv3.OpGet(c.watchKey, c.opts...)
err := dummyCache.validateWatchRange([]byte(c.watchKey), op.RangeBytes())
if gotErr := err != nil; gotErr != c.wantErr {
t.Fatalf("validateWatchRange(%q, %q, %v) err=%v, wantErr=%v",
c.cachePrefix, c.watchKey, c.opts, err, c.wantErr)
}
})
}
}

27
cache/predicate.go vendored
View File

@@ -14,6 +14,8 @@
package cache
import "bytes"
type Prefix string
func (prefix Prefix) Match(key []byte) bool {
@@ -23,3 +25,28 @@ func (prefix Prefix) Match(key []byte) bool {
prefixLen := len(prefix)
return len(key) >= prefixLen && string(key[:prefixLen]) == string(prefix)
}
func ExactKey(key []byte) KeyPredicate {
return func(k []byte) bool { return bytes.Equal(k, key) }
}
func FromKey(start []byte) KeyPredicate {
return func(k []byte) bool { return bytes.Compare(k, start) >= 0 }
}
func Range(start, end []byte) KeyPredicate {
return func(k []byte) bool {
return bytes.Compare(k, start) >= 0 &&
bytes.Compare(k, end) < 0
}
}
func KeyPredForRange(start, end []byte) KeyPredicate {
if len(end) == 0 {
return ExactKey(start)
}
if len(end) == 1 && end[0] == 0 {
return FromKey(start)
}
return Range(start, end)
}

View File

@@ -34,7 +34,7 @@ func TestCacheWatch(t *testing.T) {
t.Cleanup(func() { clus.Terminate(t) })
client := clus.Client(0)
c, err := cache.New(client, "/", cache.WithHistoryWindowSize(32))
c, err := cache.New(client, "/foo", cache.WithHistoryWindowSize(32))
if err != nil {
t.Fatalf("New(...): %v", err)
}
@@ -56,50 +56,101 @@ func TestWatch(t *testing.T) {
func testWatch(t *testing.T, kv clientv3.KV, watcher Watcher) {
ctx := t.Context()
event1Put := &clientv3.Event{
event1PutFooA := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/a"),
Key: []byte("/foo/a"),
Value: []byte("1"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
},
}
event2Put := &clientv3.Event{
event2PutFooB := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/b"),
Key: []byte("/foo/b"),
Value: []byte("2"),
CreateRevision: 3,
ModRevision: 3,
Version: 1,
},
}
event3Delete := &clientv3.Event{
event3DeleteFooA := &clientv3.Event{
Type: clientv3.EventTypeDelete,
Kv: &mvccpb.KeyValue{
Key: []byte("/a"),
Key: []byte("/foo/a"),
ModRevision: 4,
},
}
event4Put := &clientv3.Event{
event4PutFooA := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/a"),
Key: []byte("/foo/a"),
Value: []byte("3"),
CreateRevision: 5,
ModRevision: 5,
Version: 1,
},
}
event5Delete := &clientv3.Event{
event5DeleteFooB := &clientv3.Event{
Type: clientv3.EventTypeDelete,
Kv: &mvccpb.KeyValue{
Key: []byte("/b"),
Key: []byte("/foo/b"),
ModRevision: 5,
},
}
event6PutFooC := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/foo/c"),
Value: []byte("x"),
CreateRevision: 6,
ModRevision: 6,
Version: 1,
},
}
event7PutFooBar := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/foo/bar"),
Value: []byte("y"),
CreateRevision: 7,
ModRevision: 7,
Version: 1,
},
}
event8PutFooBaz := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/foo/baz"),
Value: []byte("z"),
CreateRevision: 8,
ModRevision: 8,
Version: 1,
},
}
event9PutFooYoo := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/foo/yoo"),
Value: []byte("1"),
CreateRevision: 9,
ModRevision: 9,
Version: 1,
},
}
event10PutZoo := &clientv3.Event{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("/zoo"),
Value: []byte("1"),
CreateRevision: 10,
ModRevision: 10,
Version: 1,
},
}
tcs := []struct {
name string
key string
@@ -107,33 +158,79 @@ func testWatch(t *testing.T, kv clientv3.KV, watcher Watcher) {
wantEvents []*clientv3.Event
}{
{
name: "Watch all events",
key: "/",
name: "Watch single key existing /foo/c",
key: "/foo/c",
opts: nil,
wantEvents: []*clientv3.Event{event6PutFooC},
},
{
name: "Watch single key nonexistent /doesnotexist",
key: "/doesnotexist",
opts: nil,
wantEvents: nil,
},
{
name: "Watch range empty",
key: "",
opts: []clientv3.OpOption{clientv3.WithRange("")},
wantEvents: nil,
},
{
name: "Watch range [/foo/a, /foo/b)",
key: "/foo/a",
opts: []clientv3.OpOption{clientv3.WithRange("/foo/b")},
wantEvents: []*clientv3.Event{event1PutFooA, event3DeleteFooA, event4PutFooA},
},
{
name: "Watch with prefix /foo/b",
key: "/foo/b",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
wantEvents: []*clientv3.Event{event1Put, event2Put, event3Delete, event4Put, event5Delete},
wantEvents: []*clientv3.Event{event2PutFooB, event5DeleteFooB, event7PutFooBar, event8PutFooBaz},
},
{
name: "Watch with prefix non-existent /doesnotexist",
key: "/doesnotexist",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
wantEvents: nil,
},
}
t.Log("Open test watchers")
watches := make([]clientv3.WatchChan, len(tcs))
for i, tc := range tcs {
watches[i] = watcher.Watch(ctx, tc.key, tc.opts...)
}
t.Log("Setup data")
if _, err := kv.Put(ctx, string(event1Put.Kv.Key), string(event1Put.Kv.Value)); err != nil {
if _, err := kv.Put(ctx, string(event1PutFooA.Kv.Key), string(event1PutFooA.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Put(ctx, string(event2Put.Kv.Key), string(event2Put.Kv.Value)); err != nil {
if _, err := kv.Put(ctx, string(event2PutFooB.Kv.Key), string(event2PutFooB.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Delete(ctx, string(event3Delete.Kv.Key)); err != nil {
if _, err := kv.Delete(ctx, string(event3DeleteFooA.Kv.Key)); err != nil {
t.Fatalf("Delete: %v", err)
}
if _, err := kv.Txn(ctx).Then(clientv3.OpPut(string(event4Put.Kv.Key), string(event4Put.Kv.Value)), clientv3.OpDelete(string(event5Delete.Kv.Key))).Commit(); err != nil {
if _, err := kv.Txn(ctx).Then(clientv3.OpPut(string(event4PutFooA.Kv.Key), string(event4PutFooA.Kv.Value)), clientv3.OpDelete(string(event5DeleteFooB.Kv.Key))).Commit(); err != nil {
t.Fatalf("Txn: %v", err)
}
if _, err := kv.Put(ctx, string(event6PutFooC.Kv.Key), string(event6PutFooC.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Put(ctx, string(event7PutFooBar.Kv.Key), string(event7PutFooBar.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Put(ctx, string(event8PutFooBaz.Kv.Key), string(event8PutFooBaz.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Put(ctx, string(event9PutFooYoo.Kv.Key), string(event9PutFooYoo.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
if _, err := kv.Put(ctx, string(event10PutZoo.Kv.Key), string(event10PutZoo.Kv.Value)); err != nil {
t.Fatalf("Put: %v", err)
}
t.Log("Validate")
for i, tc := range tcs {
tc := tc
i, tc := i, tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
events, _ := readEvents(watches[i])
@@ -144,6 +241,104 @@ func testWatch(t *testing.T, kv clientv3.KV, watcher Watcher) {
}
}
func TestCacheRejectsInvalidWatch(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
t.Cleanup(func() { clus.Terminate(t) })
client := clus.Client(0)
ctx := t.Context()
tests := []struct {
name string
cachePrefix string
key string
opts []clientv3.OpOption
}{
{
name: "nonzero start revision",
cachePrefix: "",
key: "",
opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(123)},
},
{
name: "zero length range",
cachePrefix: "/foo",
key: "/foo/a",
opts: []clientv3.OpOption{clientv3.WithRange("/foo/a")},
},
{
name: "invalid range (start > end)",
cachePrefix: "/foo",
key: "/foo/b",
opts: []clientv3.OpOption{clientv3.WithRange("/foo/a")},
},
{
name: "range crosses cache prefix boundary",
cachePrefix: "/foo",
key: "/foo/a",
opts: []clientv3.OpOption{clientv3.WithRange("/zzz")},
},
{
name: "fromkey empty key",
cachePrefix: "/foo",
key: "",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
},
{
name: "fromkey within cache prefix",
cachePrefix: "/foo",
key: "/foo/a",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
},
{
name: "fromkey outside cache prefix",
cachePrefix: "/foo",
key: "/zzz",
opts: []clientv3.OpOption{clientv3.WithFromKey()},
},
{
name: "prefix() outside cache prefix",
cachePrefix: "/foo",
key: "/zzz",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
},
{
name: "prefix() with empty key",
cachePrefix: "/foo",
key: "",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
c, err := cache.New(client, tc.cachePrefix)
if err != nil {
t.Fatalf("New(...): %v", err)
}
defer c.Close()
if err := c.WaitReady(ctx); err != nil {
t.Fatal(err)
}
watchCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
ch := c.Watch(watchCtx, tc.key, tc.opts...)
select {
case resp, ok := <-ch:
if !ok || !resp.Canceled {
t.Fatalf("expected canceled watch, got %+v (closed=%v)", resp, !ok)
}
case <-watchCtx.Done():
t.Fatalf("watch did not cancel within timeout")
}
})
}
}
func TestCacheLaggingWatcher(t *testing.T) {
const prefix = "/test/"
integration.BeforeTest(t)
@@ -224,58 +419,6 @@ func TestCacheLaggingWatcher(t *testing.T) {
}
}
func TestCacheRejectsUnsupportedWatch(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
t.Cleanup(func() { clus.Terminate(t) })
client := clus.Client(0)
ctx := t.Context()
c, err := cache.New(client, "")
if err != nil {
t.Fatalf("New(...): %v", err)
}
t.Cleanup(c.Close)
if err := c.WaitReady(ctx); err != nil {
t.Fatal(err)
}
tests := []struct {
name string
key string
opts []clientv3.OpOption
}{
{
name: "non_zero_start_revision",
key: "",
opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(123)},
},
{
name: "subprefix_watch",
key: "foo/",
opts: []clientv3.OpOption{clientv3.WithPrefix()},
},
{
name: "single_key_watch",
key: "foo/bar",
opts: nil, // exact-key watch (no WithPrefix)
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
watchCh := c.Watch(ctx, tt.key, tt.opts...)
resp, ok := <-watchCh
if !ok || !resp.Canceled {
t.Errorf("expected canceled response, got %#v (closed=%v)", resp, !ok)
}
})
}
}
func generateEvents(t *testing.T, client *clientv3.Client, prefix string, n int) {
t.Helper()
for i := 0; i < n; i++ {