2025-07-03 02:05:33 +00:00
|
|
|
|
// Copyright 2025 The etcd Authors
|
2025-05-29 18:15:16 +02:00
|
|
|
|
//
|
|
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
|
|
//
|
|
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
//
|
|
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
|
|
|
|
package cache
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
|
|
|
|
|
import (
|
2025-07-10 14:33:47 +00:00
|
|
|
|
"bytes"
|
2025-07-03 02:05:33 +00:00
|
|
|
|
"context"
|
|
|
|
|
|
"errors"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
2025-07-21 15:24:24 +00:00
|
|
|
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
2025-07-03 02:05:33 +00:00
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-07-23 19:16:14 +00:00
|
|
|
|
var (
|
|
|
|
|
|
// Returned when an option combination isn’t yet handled by the cache (e.g. WithPrevKV, WithProgressNotify for Watch(), WithCountOnly for Get()).
|
|
|
|
|
|
ErrUnsupportedRequest = errors.New("cache: unsupported request parameters")
|
|
|
|
|
|
// Returned when the requested key or key‑range is invalid (empty or reversed) or lies outside c.prefix.
|
|
|
|
|
|
ErrKeyRangeInvalid = errors.New("cache: invalid or out‑of‑range key range")
|
|
|
|
|
|
)
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
|
|
|
|
|
// Cache buffers a single etcd Watch for a given key‐prefix and fan‑outs local watchers.
|
|
|
|
|
|
type Cache struct {
|
2025-07-08 15:22:34 +00:00
|
|
|
|
prefix string // prefix is the key-prefix this shard is responsible for ("" = root).
|
|
|
|
|
|
cfg Config // immutable runtime configuration
|
|
|
|
|
|
watcher clientv3.Watcher
|
2025-07-21 15:24:24 +00:00
|
|
|
|
kv clientv3.KV
|
2025-07-08 15:22:34 +00:00
|
|
|
|
demux *demux // demux fans incoming events out to active watchers and manages resync.
|
2025-07-21 15:24:24 +00:00
|
|
|
|
store *store // last‑observed snapshot
|
2025-08-07 22:18:58 +00:00
|
|
|
|
ready *ready
|
2025-07-08 15:22:34 +00:00
|
|
|
|
stop context.CancelFunc
|
|
|
|
|
|
waitGroup sync.WaitGroup
|
|
|
|
|
|
internalCtx context.Context
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// New builds a cache shard that watches only the requested prefix.
|
|
|
|
|
|
// For the root cache pass "".
|
2025-07-21 15:24:24 +00:00
|
|
|
|
func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) {
|
2025-07-03 02:05:33 +00:00
|
|
|
|
cfg := defaultConfig()
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
|
|
opt(&cfg)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if cfg.HistoryWindowSize <= 0 {
|
|
|
|
|
|
return nil, fmt.Errorf("invalid HistoryWindowSize %d (must be > 0)", cfg.HistoryWindowSize)
|
|
|
|
|
|
}
|
2025-08-18 15:43:06 +00:00
|
|
|
|
if cfg.BTreeDegree < 2 {
|
|
|
|
|
|
return nil, fmt.Errorf("invalid BTreeDegree %d (must be >= 2)", cfg.BTreeDegree)
|
|
|
|
|
|
}
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
|
|
|
|
|
internalCtx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
cache := &Cache{
|
2025-07-08 15:22:34 +00:00
|
|
|
|
prefix: prefix,
|
|
|
|
|
|
cfg: cfg,
|
2025-07-21 15:24:24 +00:00
|
|
|
|
watcher: client.Watcher,
|
|
|
|
|
|
kv: client.KV,
|
2025-08-26 13:37:33 +00:00
|
|
|
|
store: newStore(cfg.BTreeDegree, cfg.HistoryWindowSize),
|
2025-08-07 22:18:58 +00:00
|
|
|
|
ready: newReady(),
|
2025-07-08 15:22:34 +00:00
|
|
|
|
stop: cancel,
|
|
|
|
|
|
internalCtx: internalCtx,
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-04 00:00:47 +00:00
|
|
|
|
cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval)
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
|
|
|
|
|
cache.waitGroup.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer cache.waitGroup.Done()
|
2025-08-13 14:33:01 +00:00
|
|
|
|
cache.getWatchLoop()
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
return cache, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Watch registers a cache-backed watcher for a given key or prefix.
|
|
|
|
|
|
// It returns a WatchChan that streams WatchResponses containing events.
|
|
|
|
|
|
func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
if err := c.WaitReady(ctx); err != nil {
|
2025-07-03 02:05:33 +00:00
|
|
|
|
emptyWatchChan := make(chan clientv3.WatchResponse)
|
|
|
|
|
|
close(emptyWatchChan)
|
|
|
|
|
|
return emptyWatchChan
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-23 17:28:36 +02:00
|
|
|
|
op := clientv3.OpWatch(key, opts...)
|
2025-07-08 15:22:34 +00:00
|
|
|
|
startRev := op.Rev()
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
2025-07-23 17:28:36 +02:00
|
|
|
|
pred, err := c.validateWatch(key, op)
|
2025-07-10 14:33:47 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
ch := make(chan clientv3.WatchResponse, 1)
|
2025-07-23 17:28:36 +02:00
|
|
|
|
ch <- clientv3.WatchResponse{Canceled: true, CancelReason: err.Error()}
|
2025-07-10 14:33:47 +00:00
|
|
|
|
close(ch)
|
|
|
|
|
|
return ch
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-10 14:33:47 +00:00
|
|
|
|
w := newWatcher(c.cfg.PerWatcherBufferSize, pred)
|
2025-07-08 15:22:34 +00:00
|
|
|
|
c.demux.Register(w, startRev)
|
2025-07-03 02:05:33 +00:00
|
|
|
|
|
|
|
|
|
|
responseChan := make(chan clientv3.WatchResponse)
|
2025-07-08 15:22:34 +00:00
|
|
|
|
c.waitGroup.Add(1)
|
2025-07-03 02:05:33 +00:00
|
|
|
|
go func() {
|
2025-07-08 15:22:34 +00:00
|
|
|
|
defer c.waitGroup.Done()
|
2025-07-03 02:05:33 +00:00
|
|
|
|
defer close(responseChan)
|
2025-07-08 15:22:34 +00:00
|
|
|
|
defer c.demux.Unregister(w)
|
|
|
|
|
|
for {
|
2025-07-03 02:05:33 +00:00
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
return
|
2025-07-08 15:22:34 +00:00
|
|
|
|
case <-c.internalCtx.Done():
|
|
|
|
|
|
return
|
2025-08-22 13:21:06 +00:00
|
|
|
|
case resp, ok := <-w.respCh:
|
2025-07-08 15:22:34 +00:00
|
|
|
|
if !ok {
|
2025-08-15 15:05:30 +00:00
|
|
|
|
if w.cancelResp != nil {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
case <-c.internalCtx.Done():
|
|
|
|
|
|
case responseChan <- *w.cancelResp:
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-08 15:22:34 +00:00
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-c.internalCtx.Done():
|
|
|
|
|
|
return
|
2025-08-22 13:21:06 +00:00
|
|
|
|
case responseChan <- resp:
|
2025-07-08 15:22:34 +00:00
|
|
|
|
}
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
return responseChan
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-21 15:24:24 +00:00
|
|
|
|
func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
2025-08-12 11:35:12 +00:00
|
|
|
|
if c.store.LatestRev() == 0 {
|
|
|
|
|
|
if err := c.WaitReady(ctx); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2025-07-21 15:24:24 +00:00
|
|
|
|
}
|
|
|
|
|
|
op := clientv3.OpGet(key, opts...)
|
|
|
|
|
|
|
|
|
|
|
|
if _, err := c.validateGet(key, op); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
startKey := []byte(key)
|
|
|
|
|
|
endKey := op.RangeBytes()
|
2025-08-26 13:37:33 +00:00
|
|
|
|
requestedRev := op.Rev()
|
|
|
|
|
|
|
|
|
|
|
|
kvs, latestRev, err := c.store.Get(startKey, endKey, requestedRev)
|
2025-07-21 15:24:24 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return &clientv3.GetResponse{
|
2025-08-26 13:37:33 +00:00
|
|
|
|
Header: &pb.ResponseHeader{Revision: latestRev},
|
2025-07-21 15:24:24 +00:00
|
|
|
|
Kvs: kvs,
|
|
|
|
|
|
Count: int64(len(kvs)),
|
|
|
|
|
|
}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-08-07 22:18:58 +00:00
|
|
|
|
// Ready returns true if the snapshot has been loaded and the first watch has been confirmed.
|
2025-07-03 02:05:33 +00:00
|
|
|
|
func (c *Cache) Ready() bool {
|
2025-08-07 22:18:58 +00:00
|
|
|
|
return c.ready.Ready()
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WaitReady blocks until the cache is ready or the ctx is cancelled.
|
|
|
|
|
|
func (c *Cache) WaitReady(ctx context.Context) error {
|
2025-08-07 22:18:58 +00:00
|
|
|
|
return c.ready.WaitReady(ctx)
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-21 15:24:24 +00:00
|
|
|
|
func (c *Cache) WaitForRevision(ctx context.Context, rev int64) error {
|
|
|
|
|
|
for {
|
|
|
|
|
|
if c.store.LatestRev() >= rev {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-time.After(10 * time.Millisecond):
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-03 02:05:33 +00:00
|
|
|
|
// Close cancels the private context and blocks until all goroutines return.
|
|
|
|
|
|
func (c *Cache) Close() {
|
|
|
|
|
|
c.stop()
|
|
|
|
|
|
c.waitGroup.Wait()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-08-13 14:33:01 +00:00
|
|
|
|
func (c *Cache) getWatchLoop() {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
cfg := defaultConfig()
|
2025-08-13 14:33:01 +00:00
|
|
|
|
ctx := c.internalCtx
|
2025-07-21 15:24:24 +00:00
|
|
|
|
backoff := cfg.InitialBackoff
|
2025-07-03 02:05:33 +00:00
|
|
|
|
for {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
if err := ctx.Err(); err != nil {
|
2025-07-03 02:05:33 +00:00
|
|
|
|
return
|
|
|
|
|
|
}
|
2025-08-13 14:33:01 +00:00
|
|
|
|
if err := c.getWatch(); err != nil {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
fmt.Printf("getWatch failed, will retry after %v: %v\n", backoff, err)
|
|
|
|
|
|
}
|
2025-07-03 02:05:33 +00:00
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-time.After(backoff):
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-08-13 14:33:01 +00:00
|
|
|
|
func (c *Cache) getWatch() error {
|
|
|
|
|
|
getResp, err := c.get(c.internalCtx)
|
2025-07-21 15:24:24 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2025-08-13 14:33:01 +00:00
|
|
|
|
return c.watch(getResp.Header.Revision + 1)
|
2025-07-21 15:24:24 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *Cache) get(ctx context.Context) (*clientv3.GetResponse, error) {
|
|
|
|
|
|
resp, err := c.kv.Get(ctx, c.prefix, clientv3.WithPrefix())
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
c.store.Restore(resp.Kvs, resp.Header.Revision)
|
|
|
|
|
|
return resp, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-08-13 14:33:01 +00:00
|
|
|
|
func (c *Cache) watch(rev int64) error {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
readyOnce := sync.Once{}
|
|
|
|
|
|
for {
|
2025-08-13 14:33:01 +00:00
|
|
|
|
storeW := newWatcher(c.cfg.PerWatcherBufferSize, nil)
|
|
|
|
|
|
c.demux.Register(storeW, rev)
|
|
|
|
|
|
applyErr := make(chan error, 1)
|
|
|
|
|
|
c.waitGroup.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
defer c.waitGroup.Done()
|
|
|
|
|
|
if err := c.applyStorage(storeW); err != nil {
|
|
|
|
|
|
applyErr <- err
|
|
|
|
|
|
}
|
|
|
|
|
|
close(applyErr)
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
2025-09-19 15:05:41 +00:00
|
|
|
|
err := c.watchEvents(rev, applyErr, &readyOnce)
|
2025-08-13 14:33:01 +00:00
|
|
|
|
c.demux.Unregister(storeW)
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *Cache) applyStorage(storeW *watcher) error {
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-c.internalCtx.Done():
|
|
|
|
|
|
return nil
|
2025-08-22 13:21:06 +00:00
|
|
|
|
case resp, ok := <-storeW.respCh:
|
2025-08-13 14:33:01 +00:00
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2025-09-05 13:48:54 +00:00
|
|
|
|
if err := c.store.Apply(resp); err != nil {
|
2025-07-21 15:24:24 +00:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
2025-08-13 14:33:01 +00:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-21 15:24:24 +00:00
|
|
|
|
|
2025-09-19 15:05:41 +00:00
|
|
|
|
func (c *Cache) watchEvents(rev int64, applyErr <-chan error, readyOnce *sync.Once) error {
|
|
|
|
|
|
watchCh := c.watcher.Watch(
|
|
|
|
|
|
c.internalCtx,
|
|
|
|
|
|
c.prefix,
|
|
|
|
|
|
clientv3.WithPrefix(),
|
|
|
|
|
|
clientv3.WithRev(rev),
|
|
|
|
|
|
clientv3.WithProgressNotify(),
|
|
|
|
|
|
clientv3.WithCreatedNotify(),
|
|
|
|
|
|
)
|
2025-08-13 14:33:01 +00:00
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-c.internalCtx.Done():
|
|
|
|
|
|
return c.internalCtx.Err()
|
|
|
|
|
|
case resp, ok := <-watchCh:
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2025-09-19 15:05:41 +00:00
|
|
|
|
readyOnce.Do(func() {
|
|
|
|
|
|
c.demux.Init(rev)
|
|
|
|
|
|
c.ready.Set()
|
|
|
|
|
|
})
|
2025-08-13 14:33:01 +00:00
|
|
|
|
if err := resp.Err(); err != nil {
|
2025-08-07 22:18:58 +00:00
|
|
|
|
c.ready.Reset()
|
2025-07-21 15:24:24 +00:00
|
|
|
|
return err
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
2025-09-19 15:05:41 +00:00
|
|
|
|
err := c.demux.Broadcast(resp)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
c.ready.Reset()
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2025-08-13 14:33:01 +00:00
|
|
|
|
case err := <-applyErr:
|
|
|
|
|
|
c.ready.Reset()
|
|
|
|
|
|
return err
|
2025-07-03 02:05:33 +00:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-21 15:24:24 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-23 17:28:36 +02:00
|
|
|
|
func (c *Cache) validateWatch(key string, op clientv3.Op) (pred KeyPredicate, err error) {
|
2025-07-29 17:59:07 +00:00
|
|
|
|
switch {
|
|
|
|
|
|
case op.IsPrevKV():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: PrevKV not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsFragment():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: Fragment not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsProgressNotify():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: ProgressNotify not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsCreatedNotify():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: CreatedNotify not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsFilterPut():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: FilterPut not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsFilterDelete():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: FilterDelete not supported", ErrUnsupportedRequest)
|
2025-07-23 17:28:36 +02:00
|
|
|
|
}
|
2025-07-10 14:33:47 +00:00
|
|
|
|
|
|
|
|
|
|
startKey := []byte(key)
|
|
|
|
|
|
endKey := op.RangeBytes() // nil = single key, {0}=FromKey, else explicit range
|
|
|
|
|
|
|
2025-07-24 16:11:47 +00:00
|
|
|
|
if err := c.validateRange(startKey, endKey); err != nil {
|
2025-07-10 14:33:47 +00:00
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return KeyPredForRange(startKey, endKey), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-21 15:24:24 +00:00
|
|
|
|
func (c *Cache) validateGet(key string, op clientv3.Op) (KeyPredicate, error) {
|
|
|
|
|
|
switch {
|
|
|
|
|
|
case op.IsCountOnly():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: CountOnly not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsPrevKV():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: PrevKV not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.IsSortSet():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: SortSet not supported", ErrUnsupportedRequest)
|
|
|
|
|
|
case op.Limit() != 0:
|
|
|
|
|
|
return nil, fmt.Errorf("%w: Limit(%d) not supported", ErrUnsupportedRequest, op.Limit())
|
|
|
|
|
|
case op.MinModRev() != 0:
|
|
|
|
|
|
return nil, fmt.Errorf("%w: MinModRev(%d) not supported", ErrUnsupportedRequest, op.MinModRev())
|
|
|
|
|
|
case op.MaxModRev() != 0:
|
|
|
|
|
|
return nil, fmt.Errorf("%w: MaxModRev(%d) not supported", ErrUnsupportedRequest, op.MaxModRev())
|
|
|
|
|
|
case op.MinCreateRev() != 0:
|
|
|
|
|
|
return nil, fmt.Errorf("%w: MinCreateRev(%d) not supported", ErrUnsupportedRequest, op.MinCreateRev())
|
|
|
|
|
|
case op.MaxCreateRev() != 0:
|
|
|
|
|
|
return nil, fmt.Errorf("%w: MaxCreateRev(%d) not supported", ErrUnsupportedRequest, op.MaxCreateRev())
|
|
|
|
|
|
// cache now only serves serializable reads of the latest revision (rev == 0).
|
|
|
|
|
|
case !op.IsSerializable():
|
|
|
|
|
|
return nil, fmt.Errorf("%w: non-serializable request", ErrUnsupportedRequest)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
startKey := []byte(key)
|
|
|
|
|
|
endKey := op.RangeBytes()
|
|
|
|
|
|
|
|
|
|
|
|
if err := c.validateRange(startKey, endKey); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return KeyPredForRange(startKey, endKey), nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-24 16:11:47 +00:00
|
|
|
|
func (c *Cache) validateRange(startKey, endKey []byte) error {
|
2025-07-10 14:33:47 +00:00
|
|
|
|
prefixStart := []byte(c.prefix)
|
|
|
|
|
|
prefixEnd := []byte(clientv3.GetPrefixRangeEnd(c.prefix))
|
|
|
|
|
|
|
|
|
|
|
|
isSingleKey := len(endKey) == 0
|
|
|
|
|
|
isFromKey := len(endKey) == 1 && endKey[0] == 0
|
|
|
|
|
|
|
|
|
|
|
|
switch {
|
|
|
|
|
|
case isSingleKey:
|
|
|
|
|
|
if c.prefix == "" {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if bytes.Compare(startKey, prefixStart) < 0 || bytes.Compare(startKey, prefixEnd) >= 0 {
|
2025-07-23 19:16:14 +00:00
|
|
|
|
return ErrKeyRangeInvalid
|
2025-07-10 14:33:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
|
|
case isFromKey:
|
|
|
|
|
|
if c.prefix != "" {
|
2025-07-23 19:16:14 +00:00
|
|
|
|
return ErrKeyRangeInvalid
|
2025-07-10 14:33:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
if bytes.Compare(endKey, startKey) <= 0 {
|
2025-07-23 19:16:14 +00:00
|
|
|
|
return ErrKeyRangeInvalid
|
2025-07-10 14:33:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
if c.prefix == "" {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if bytes.Compare(startKey, prefixStart) < 0 || bytes.Compare(endKey, prefixEnd) > 0 {
|
2025-07-23 19:16:14 +00:00
|
|
|
|
return ErrKeyRangeInvalid
|
2025-07-10 14:33:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|