From 857b36c84f15ca58449b1384ca7349a86faebdb8 Mon Sep 17 00:00:00 2001 From: Peter Chang Date: Mon, 18 Aug 2025 15:43:06 +0000 Subject: [PATCH] cache: migrate storage layer to B-tree Signed-off-by: Peter Chang --- cache/cache.go | 5 ++- cache/config.go | 7 ++++ cache/go.mod | 1 + cache/go.sum | 2 + cache/store.go | 94 +++++++++++++++++++++------------------------ cache/store_test.go | 6 +-- 6 files changed, 61 insertions(+), 54 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 567cdbf57..9fd960d8f 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -59,6 +59,9 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) if cfg.HistoryWindowSize <= 0 { return nil, fmt.Errorf("invalid HistoryWindowSize %d (must be > 0)", cfg.HistoryWindowSize) } + if cfg.BTreeDegree < 2 { + return nil, fmt.Errorf("invalid BTreeDegree %d (must be >= 2)", cfg.BTreeDegree) + } internalCtx, cancel := context.WithCancel(context.Background()) @@ -67,7 +70,7 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) cfg: cfg, watcher: client.Watcher, kv: client.KV, - store: newStore(), + store: newStore(cfg.BTreeDegree), ready: newReady(), stop: cancel, internalCtx: internalCtx, diff --git a/cache/config.go b/cache/config.go index b51a52a70..cf18ffcc7 100644 --- a/cache/config.go +++ b/cache/config.go @@ -31,6 +31,8 @@ type Config struct { MaxBackoff time.Duration // GetTimeout is the timeout applied to the first Get() used to bootstrap the cache. GetTimeout time.Duration + // BTreeDegree controls the degree (branching factor) of the in-memory B-tree store. + BTreeDegree int } // TODO: tune via performance/load tests. @@ -42,6 +44,7 @@ func defaultConfig() Config { InitialBackoff: 50 * time.Millisecond, MaxBackoff: 2 * time.Second, GetTimeout: 5 * time.Second, + BTreeDegree: 32, } } @@ -70,3 +73,7 @@ func WithMaxBackoff(d time.Duration) Option { func WithGetTimeout(d time.Duration) Option { return func(c *Config) { c.GetTimeout = d } } + +func WithBTreeDegree(n int) Option { + return func(c *Config) { c.BTreeDegree = n } +} diff --git a/cache/go.mod b/cache/go.mod index 2c63bd726..b835c077d 100644 --- a/cache/go.mod +++ b/cache/go.mod @@ -5,6 +5,7 @@ go 1.24 toolchain go1.24.6 require ( + github.com/google/btree v1.1.3 github.com/google/go-cmp v0.7.0 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 diff --git a/cache/go.sum b/cache/go.sum index 53b6764c2..888b46148 100644 --- a/cache/go.sum +++ b/cache/go.sum @@ -17,6 +17,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= diff --git a/cache/store.go b/cache/store.go index fa1c1035e..13d140cfa 100644 --- a/cache/store.go +++ b/cache/store.go @@ -15,11 +15,11 @@ package cache import ( - "bytes" "fmt" - "sort" "sync" + "github.com/google/btree" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -28,12 +28,29 @@ var ErrNotReady = fmt.Errorf("cache: store not ready") type store struct { mu sync.RWMutex - kvs map[string]*mvccpb.KeyValue + tree *btree.BTree + degree int latestRev int64 } -func newStore() *store { - return &store{kvs: make(map[string]*mvccpb.KeyValue)} +func newStore(degree int) *store { + return &store{ + tree: btree.New(degree), + degree: degree, + } +} + +type kvItem struct { + key string + kv *mvccpb.KeyValue +} + +func newKVItem(kv *mvccpb.KeyValue) *kvItem { + return &kvItem{key: string(kv.Key), kv: kv} +} + +func (a *kvItem) Less(b btree.Item) bool { + return a.key < b.(*kvItem).key } func (s *store) Get(startKey, endKey []byte) ([]*mvccpb.KeyValue, int64, error) { @@ -47,16 +64,22 @@ func (s *store) Get(startKey, endKey []byte) ([]*mvccpb.KeyValue, int64, error) var out []*mvccpb.KeyValue switch { case len(endKey) == 0: - out = s.getSingle(startKey) - case isPrefixScan(endKey): - out = s.scanPrefix(startKey) - default: - out = s.scanRange(startKey, endKey) - } + if item := s.tree.Get(probeItemFromBytes(startKey)); item != nil { + out = append(out, item.(*kvItem).kv) + } - sort.Slice(out, func(i, j int) bool { - return bytes.Compare(out[i].Key, out[j].Key) < 0 // default: lexicographical, ascending‐by‐key sort - }) + case isPrefixScan(endKey): + s.tree.AscendGreaterOrEqual(probeItemFromBytes(startKey), func(item btree.Item) bool { + out = append(out, item.(*kvItem).kv) + return true + }) + + default: + s.tree.AscendRange(probeItemFromBytes(startKey), probeItemFromBytes(endKey), func(item btree.Item) bool { + out = append(out, item.(*kvItem).kv) + return true + }) + } return out, s.latestRev, nil } @@ -64,9 +87,9 @@ func (s *store) Restore(kvs []*mvccpb.KeyValue, rev int64) { s.mu.Lock() defer s.mu.Unlock() - s.kvs = make(map[string]*mvccpb.KeyValue, len(kvs)) + s.tree = btree.New(s.degree) for _, kv := range kvs { - s.kvs[string(kv.Key)] = kv + s.tree.ReplaceOrInsert(newKVItem(kv)) } s.latestRev = rev } @@ -82,12 +105,11 @@ func (s *store) Apply(events []*clientv3.Event) error { for _, ev := range events { switch ev.Type { case clientv3.EventTypeDelete: - if _, ok := s.kvs[string(ev.Kv.Key)]; !ok { - return fmt.Errorf("cache: delete non-existent key %s)", string(ev.Kv.Key)) + if removed := s.tree.Delete(&kvItem{key: string(ev.Kv.Key)}); removed == nil { + return fmt.Errorf("cache: delete non-existent key %s", string(ev.Kv.Key)) } - delete(s.kvs, string(ev.Kv.Key)) case clientv3.EventTypePut: - s.kvs[string(ev.Kv.Key)] = ev.Kv + s.tree.ReplaceOrInsert(newKVItem(ev.Kv)) } if ev.Kv.ModRevision > s.latestRev { s.latestRev = ev.Kv.ModRevision @@ -102,36 +124,6 @@ func (s *store) LatestRev() int64 { return s.latestRev } -// getSingle fetches one key or nil -func (s *store) getSingle(key []byte) []*mvccpb.KeyValue { - if kv, ok := s.kvs[string(key)]; ok { - return []*mvccpb.KeyValue{kv} - } - return nil -} - -// scanPrefix returns all keys >= startKey -func (s *store) scanPrefix(startKey []byte) []*mvccpb.KeyValue { - var res []*mvccpb.KeyValue - for _, kv := range s.kvs { - if bytes.Compare(kv.Key, startKey) >= 0 { - res = append(res, kv) - } - } - return res -} - -// scanRange returns all keys in [startKey, endKey) -func (s *store) scanRange(startKey, endKey []byte) []*mvccpb.KeyValue { - var res []*mvccpb.KeyValue - for _, kv := range s.kvs { - if bytes.Compare(kv.Key, startKey) >= 0 && bytes.Compare(kv.Key, endKey) < 0 { - res = append(res, kv) - } - } - return res -} - // isPrefixScan detects endKey=={0} semantics func isPrefixScan(endKey []byte) bool { return len(endKey) == 1 && endKey[0] == 0 @@ -152,3 +144,5 @@ func validateRevisions(events []*clientv3.Event, latestRev int64) error { } return nil } + +func probeItemFromBytes(b []byte) *kvItem { return &kvItem{key: string(b)} } diff --git a/cache/store_test.go b/cache/store_test.go index d76737c39..e1820985c 100644 --- a/cache/store_test.go +++ b/cache/store_test.go @@ -118,7 +118,7 @@ func TestStoreGet(t *testing.T) { for _, tt := range tests { test := tt t.Run(test.name, func(t *testing.T) { - s := newStore() + s := newStore(8) if test.initialKVs != nil { s.Restore(test.initialKVs, test.initialRev) } @@ -276,7 +276,7 @@ func TestStoreApply(t *testing.T) { for _, tt := range tests { test := tt t.Run(test.name, func(t *testing.T) { - s := newStore() + s := newStore(4) s.Restore(test.initialKVs, test.initialRev) var gotErr error @@ -339,7 +339,7 @@ func TestStoreRestore(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := newStore() + s := newStore(8) for _, step := range tt.seq { s.Restore(step.kvs, step.rev) }