diff --git a/balancer/rls/internal/cache.go b/balancer/rls/internal/cache.go new file mode 100644 index 00000000000..527b9b278a1 --- /dev/null +++ b/balancer/rls/internal/cache.go @@ -0,0 +1,404 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rls + +import ( + "container/list" + "time" + + "google.golang.org/grpc/internal/backoff" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" +) + +// TODO(easwars): Remove this once all RLS code is merged. +//lint:file-ignore U1000 Ignore all unused code, not all code is merged yet. + +// cacheKey represents the key used to uniquely identify an entry in the data +// cache and in the pending requests map. +type cacheKey struct { + // path is the full path of the incoming RPC request. + path string + // keys is a stringified version of the RLS request key map built using the + // RLS keyBuilder. Since maps are not a type which is comparable in Go, it + // cannot be part of the key for another map (entries in the data cache and + // pending requests map are stored in maps). + keys string +} + +// cacheEntry wraps all the data to be stored in a data cache entry. +type cacheEntry struct { + // childPolicyWrappers contains the list of child policy wrappers + // corresponding to the targets returned by the RLS server for this entry. + childPolicyWrappers []*childPolicyWrapper + // headerData is received in the RLS response and is to be sent in the + // X-Google-RLS-Data header for matching RPCs. + headerData string + // expiryTime is the absolute time at which this cache entry entry stops + // being valid. When an RLS request succeeds, this is set to the current + // time plus the max_age field from the LB policy config. + expiryTime time.Time + // staleTime is the absolute time after which this cache entry will be + // proactively refreshed if an incoming RPC matches this entry. When an RLS + // request succeeds, this is set to the current time plus the stale_age from + // the LB policy config. + staleTime time.Time + // earliestEvictTime is the absolute time before which this entry should not + // be evicted from the cache. When a cache entry is created, this is set to + // the current time plus a default value of 5 seconds. This is required to + // make sure that a new entry added to the cache is not evicted before the + // RLS response arrives (usually when the cache is too small). + earliestEvictTime time.Time + + // status stores the RPC status of the previous RLS request for this + // entry. Picks for entries with a non-nil value for this field are failed + // with the error stored here. + status error + // backoffState contains all backoff related state. When an RLS request + // succeeds, backoffState is reset. This state moves between the data cache + // and the pending requests map. + backoffState *backoffState + // backoffTime is the absolute time at which the backoff period for this + // entry ends. When an RLS request fails, this is set to the current time + // plus the backoff value returned by the backoffState. The backoff timer is + // also setup with this value. No new RLS requests are sent out for this + // entry until the backoff period ends. + // + // Set to zero time instant upon a successful RLS response. + backoffTime time.Time + // backoffExpiryTime is the absolute time at which an entry which has gone + // through backoff stops being valid. When an RLS request fails, this is + // set to the current time plus twice the backoff time. The cache expiry + // timer will only delete entries for which both expiryTime and + // backoffExpiryTime are in the past. + // + // Set to zero time instant upon a successful RLS response. + backoffExpiryTime time.Time + + // size stores the size of this cache entry. Used to enforce the cache size + // specified in the LB policy configuration. + size int64 + // onEvict is the callback to be invoked when this cache entry is evicted. + onEvict func() +} + +// backoffState wraps all backoff related state associated with a cache entry. +type backoffState struct { + // retries keeps track of the number of RLS failures, to be able to + // determine the amount of time to backoff before the next attempt. + retries int + // bs is the exponential backoff implementation which returns the amount of + // time to backoff, given the number of retries. + bs backoff.Strategy + // timer fires when the backoff period ends and incoming requests after this + // will trigger a new RLS request. + timer *time.Timer +} + +// lru is a cache implementation with a least recently used eviction policy. +// Internally it uses a doubly linked list, with the least recently used element +// at the front of the list and the most recently used element at the back of +// the list. The value stored in this cache will be of type `cacheKey`. +// +// It is not safe for concurrent access. +type lru struct { + ll *list.List + + // A map from the value stored in the lru to its underlying list element is + // maintained to have a clean API. Without this, a subset of the lru's API + // would accept/return cacheKey while another subset would accept/return + // list elements. + m map[cacheKey]*list.Element +} + +// newLRU creates a new cache with a least recently used eviction policy. +func newLRU() *lru { + return &lru{ + ll: list.New(), + m: make(map[cacheKey]*list.Element), + } +} + +func (l *lru) addEntry(key cacheKey) { + e := l.ll.PushBack(key) + l.m[key] = e +} + +func (l *lru) makeRecent(key cacheKey) { + e := l.m[key] + l.ll.MoveToBack(e) +} + +func (l *lru) removeEntry(key cacheKey) { + e := l.m[key] + l.ll.Remove(e) + delete(l.m, key) +} + +func (l *lru) getLeastRecentlyUsed() cacheKey { + e := l.ll.Front() + if e == nil { + return cacheKey{} + } + return e.Value.(cacheKey) +} + +// iterateAndRun traverses the lru in least-recently-used order and calls the +// provided function for every element. +// +// Callers may delete the cache entry associated with the cacheKey passed into +// f, but they may not perform any other operation which reorders the elements +// in the lru. +func (l *lru) iterateAndRun(f func(cacheKey)) { + var next *list.Element + for e := l.ll.Front(); e != nil; e = next { + next = e.Next() + f(e.Value.(cacheKey)) + } +} + +// dataCache contains a cache of RLS data used by the LB policy to make routing +// decisions. +// +// The dataCache will be keyed by the request's path and keys, represented by +// the `cacheKey` type. It will maintain the cache keys in an `lru` and the +// cache data, represented by the `cacheEntry` type, in a native map. +// +// It is not safe for concurrent access. +type dataCache struct { + maxSize int64 // Maximum allowed size. + currentSize int64 // Current size. + keys *lru // Cache keys maintained in lru order. + entries map[cacheKey]*cacheEntry + logger *internalgrpclog.PrefixLogger + shutdown *grpcsync.Event +} + +func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache { + return &dataCache{ + maxSize: size, + keys: newLRU(), + entries: make(map[cacheKey]*cacheEntry), + logger: logger, + shutdown: grpcsync.NewEvent(), + } +} + +// resize changes the maximum allowed size of the data cache. +// +// The return value indicates if an entry with a valid backoff timer was +// evicted. This is important to the RLS LB policy which would send a new picker +// on the channel to re-process any RPCs queued as a result of this backoff +// timer. +func (dc *dataCache) resize(size int64) (backoffCancelled bool) { + if dc.shutdown.HasFired() { + return false + } + + backoffCancelled = false + for dc.currentSize > size { + key := dc.keys.getLeastRecentlyUsed() + entry, ok := dc.entries[key] + if !ok { + // This should never happen. + dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to resize it", key) + break + } + + // When we encounter a cache entry whose minimum expiration time is in + // the future, we abort the LRU pass, which may temporarily leave the + // cache being too large. This is necessary to ensure that in cases + // where the cache is too small, when we receive an RLS Response, we + // keep the resulting cache entry around long enough for the pending + // incoming requests to be re-processed through the new Picker. If we + // didn't do this, then we'd risk throwing away each RLS response as we + // receive it, in which case we would fail to actually route any of our + // incoming requests. + if entry.earliestEvictTime.After(time.Now()) { + dc.logger.Warningf("cachekey %+v is too recent to be evicted. Stopping cache resizing for now", key) + break + } + + // Stop the backoff timer before evicting the entry. + if entry.backoffState != nil && entry.backoffState.timer != nil { + if entry.backoffState.timer.Stop() { + entry.backoffState.timer = nil + backoffCancelled = true + } + } + dc.deleteAndcleanup(key, entry) + } + dc.maxSize = size + return backoffCancelled +} + +// evictExpiredEntries sweeps through the cache and deletes expired entries. An +// expired entry is one for which both the `expiryTime` and `backoffExpiryTime` +// fields are in the past. +// +// The return value indicates if any expired entries were evicted. +// +// The LB policy invokes this method periodically to purge expired entries. +func (dc *dataCache) evictExpiredEntries() (evicted bool) { + if dc.shutdown.HasFired() { + return false + } + + evicted = false + dc.keys.iterateAndRun(func(key cacheKey) { + entry, ok := dc.entries[key] + if !ok { + // This should never happen. + dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key) + return + } + + // Only evict entries for which both the data expiration time and + // backoff expiration time fields are in the past. + now := time.Now() + if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) { + return + } + evicted = true + dc.deleteAndcleanup(key, entry) + }) + return evicted +} + +// resetBackoffState sweeps through the cache and for entries with a backoff +// state, the backoff timer is cancelled and the backoff state is reset. The +// return value indicates if any entries were mutated in this fashion. +// +// The LB policy invokes this method when the control channel moves from READY +// to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on +// the `controlChannel` type for more details. +func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) { + if dc.shutdown.HasFired() { + return false + } + + backoffReset = false + dc.keys.iterateAndRun(func(key cacheKey) { + entry, ok := dc.entries[key] + if !ok { + // This should never happen. + dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key) + return + } + + if entry.backoffState == nil { + return + } + if entry.backoffState.timer != nil { + entry.backoffState.timer.Stop() + entry.backoffState.timer = nil + } + entry.backoffState = &backoffState{bs: newBackoffState.bs} + entry.backoffTime = time.Time{} + entry.backoffExpiryTime = time.Time{} + backoffReset = true + }) + return backoffReset +} + +// addEntry adds a cache entry for the given key. +// +// Return value backoffCancelled indicates if a cache entry with a valid backoff +// timer was evicted to make space for the current entry. This is important to +// the RLS LB policy which would send a new picker on the channel to re-process +// any RPCs queued as a result of this backoff timer. +// +// Return value ok indicates if entry was successfully added to the cache. +func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled bool, ok bool) { + if dc.shutdown.HasFired() { + return false, false + } + + // Handle the extremely unlikely case that a single entry is bigger than the + // size of the cache. + if entry.size > dc.maxSize { + return false, false + } + dc.entries[key] = entry + dc.currentSize += entry.size + dc.keys.addEntry(key) + // If the new entry makes the cache go over its configured size, remove some + // old entries. + if dc.currentSize > dc.maxSize { + backoffCancelled = dc.resize(dc.maxSize) + } + return backoffCancelled, true +} + +// updateEntrySize updates the size of a cache entry and the current size of the +// data cache. An entry's size can change upon receipt of an RLS response. +func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) { + dc.currentSize -= entry.size + entry.size = newSize + dc.currentSize += entry.size +} + +func (dc *dataCache) getEntry(key cacheKey) *cacheEntry { + if dc.shutdown.HasFired() { + return nil + } + + entry, ok := dc.entries[key] + if !ok { + return nil + } + dc.keys.makeRecent(key) + return entry +} + +func (dc *dataCache) removeEntryForTesting(key cacheKey) { + entry, ok := dc.entries[key] + if !ok { + return + } + dc.deleteAndcleanup(key, entry) +} + +// deleteAndCleanup performs actions required at the time of deleting an entry +// from the data cache. +// - the entry is removed from the map of entries +// - current size of the data cache is update +// - the key is removed from the LRU +// - onEvict is invoked in a separate goroutine +func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) { + delete(dc.entries, key) + dc.currentSize -= entry.size + dc.keys.removeEntry(key) + if entry.onEvict != nil { + go entry.onEvict() + } +} + +func (dc *dataCache) stop() { + dc.keys.iterateAndRun(func(key cacheKey) { + entry, ok := dc.entries[key] + if !ok { + // This should never happen. + dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key) + return + } + dc.deleteAndcleanup(key, entry) + }) + dc.shutdown.Fire() +} diff --git a/balancer/rls/internal/cache/cache.go b/balancer/rls/internal/cache/cache.go deleted file mode 100644 index b975c3078fd..00000000000 --- a/balancer/rls/internal/cache/cache.go +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package cache provides an LRU cache implementation to be used by the RLS LB -// policy to cache RLS response data. -package cache - -import ( - "container/list" - "sync" - "time" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/backoff" -) - -var logger = grpclog.Component("rls") - -// Key represents the cache key used to uniquely identify a cache entry. -type Key struct { - // Path is the full path of the incoming RPC request. - Path string - // KeyMap is a stringified version of the RLS request keys built using the - // RLS keyBuilder. Since map is not a Type which is comparable in Go, it - // cannot be part of the key for another map (the LRU cache is implemented - // using a native map type). - KeyMap string -} - -// Entry wraps all the data to be stored in a cache entry. -type Entry struct { - // Mu synchronizes access to this particular cache entry. The LB policy - // will also hold another mutex to synchronize access to the cache as a - // whole. To avoid holding the top-level mutex for the whole duration for - // which one particular cache entry is acted upon, we use this entry mutex. - Mu sync.Mutex - // ExpiryTime is the absolute time at which the data cached as part of this - // entry stops being valid. When an RLS request succeeds, this is set to - // the current time plus the max_age field from the LB policy config. An - // entry with this field in the past is not used to process picks. - ExpiryTime time.Time - // BackoffExpiryTime is the absolute time at which an entry which has gone - // through backoff stops being valid. When an RLS request fails, this is - // set to the current time plus twice the backoff time. The cache expiry - // timer will only delete entries for which both ExpiryTime and - // BackoffExpiryTime are in the past. - BackoffExpiryTime time.Time - // StaleTime is the absolute time after which this entry will be - // proactively refreshed if we receive a request for it. When an RLS - // request succeeds, this is set to the current time plus the stale_age - // from the LB policy config. - StaleTime time.Time - // BackoffTime is the absolute time at which the backoff period for this - // entry ends. The backoff timer is setup with this value. No new RLS - // requests are sent out for this entry until the backoff period ends. - BackoffTime time.Time - // EarliestEvictTime is the absolute time before which this entry should - // not be evicted from the cache. This is set to a default value of 5 - // seconds when the entry is created. This is required to make sure that a - // new entry added to the cache is not evicted before the RLS response - // arrives (usually when the cache is too small). - EarliestEvictTime time.Time - // CallStatus stores the RPC status of the previous RLS request for this - // entry. Picks for entries with a non-nil value for this field are failed - // with the error stored here. - CallStatus error - // Backoff contains all backoff related state. When an RLS request - // succeeds, backoff state is reset. - Backoff BackoffState - // HeaderData is received in an RLS response and is to be sent in the - // X-Google-RLS-Data header for matching RPCs. - HeaderData string - // ChildPicker is a very thin wrapper around the child policy wrapper. - // The type is declared as a Picker interface since the users of - // the cache only care about the picker provided by the child policy, and - // this makes it easy for testing. - ChildPicker balancer.Picker - - // size stores the size of this cache entry. Uses only a subset of the - // fields. See `entrySize` for this is computed. - size int64 - // key contains the cache key corresponding to this entry. This is required - // from methods like `removeElement` which only have a pointer to the - // list.Element which contains a reference to the cache.Entry. But these - // methods need the cache.Key to be able to remove the entry from the - // underlying map. - key Key -} - -// BackoffState wraps all backoff related state associated with a cache entry. -type BackoffState struct { - // Retries keeps track of the number of RLS failures, to be able to - // determine the amount of time to backoff before the next attempt. - Retries int - // Backoff is an exponential backoff implementation which returns the - // amount of time to backoff, given the number of retries. - Backoff backoff.Strategy - // Timer fires when the backoff period ends and incoming requests after - // this will trigger a new RLS request. - Timer *time.Timer - // Callback provided by the LB policy to be notified when the backoff timer - // expires. This will trigger a new picker to be returned to the - // ClientConn, to force queued up RPCs to be retried. - Callback func() -} - -// LRU is a cache with a least recently used eviction policy. It is not safe -// for concurrent access. -type LRU struct { - maxSize int64 - usedSize int64 - onEvicted func(Key, *Entry) - - ll *list.List - cache map[Key]*list.Element -} - -// NewLRU creates a cache.LRU with a size limit of maxSize and the provided -// eviction callback. -// -// Currently, only the cache.Key and the HeaderData field from cache.Entry -// count towards the size of the cache (other overhead per cache entry is not -// counted). The cache could temporarily exceed the configured maxSize because -// we want the entries to spend a configured minimum amount of time in the -// cache before they are LRU evicted (so that all the work performed in sending -// an RLS request and caching the response is not a total waste). -// -// The provided onEvited callback must not attempt to re-add the entry inline -// and the RLS LB policy does not have a need to do that. -// -// The cache package trusts the RLS policy (its only user) to supply a default -// minimum non-zero maxSize, in the event that the ServiceConfig does not -// provide a value for it. -func NewLRU(maxSize int64, onEvicted func(Key, *Entry)) *LRU { - return &LRU{ - maxSize: maxSize, - onEvicted: onEvicted, - ll: list.New(), - cache: make(map[Key]*list.Element), - } -} - -// Resize sets the size limit of the LRU to newMaxSize and removes older -// entries, if required, to comply with the new limit. -func (lru *LRU) Resize(newMaxSize int64) { - lru.maxSize = newMaxSize - lru.removeToFit(0) -} - -// TODO(easwars): If required, make this function more sophisticated. -func entrySize(key Key, value *Entry) int64 { - return int64(len(key.Path) + len(key.KeyMap) + len(value.HeaderData)) -} - -// removeToFit removes older entries from the cache to make room for a new -// entry of size newSize. -func (lru *LRU) removeToFit(newSize int64) { - now := time.Now() - for lru.usedSize+newSize > lru.maxSize { - elem := lru.ll.Back() - if elem == nil { - // This is a corner case where the cache is empty, but the new entry - // to be added is bigger than maxSize. - logger.Info("rls: newly added cache entry exceeds cache maxSize") - return - } - - entry := elem.Value.(*Entry) - if t := entry.EarliestEvictTime; !t.IsZero() && t.Before(now) { - // When the oldest entry is too new (it hasn't even spent a default - // minimum amount of time in the cache), we abort and allow the - // cache to grow bigger than the configured maxSize. - logger.Info("rls: LRU eviction finds oldest entry to be too new. Allowing cache to exceed maxSize momentarily") - return - } - lru.removeElement(elem) - } -} - -// Add adds a new entry to the cache. -func (lru *LRU) Add(key Key, value *Entry) { - size := entrySize(key, value) - elem, ok := lru.cache[key] - if !ok { - lru.removeToFit(size) - lru.usedSize += size - value.size = size - value.key = key - elem := lru.ll.PushFront(value) - lru.cache[key] = elem - return - } - - existing := elem.Value.(*Entry) - sizeDiff := size - existing.size - lru.removeToFit(sizeDiff) - value.size = size - elem.Value = value - lru.ll.MoveToFront(elem) - lru.usedSize += sizeDiff -} - -// Remove removes a cache entry wth key key, if one exists. -func (lru *LRU) Remove(key Key) { - if elem, ok := lru.cache[key]; ok { - lru.removeElement(elem) - } -} - -func (lru *LRU) removeElement(e *list.Element) { - entry := e.Value.(*Entry) - lru.ll.Remove(e) - delete(lru.cache, entry.key) - lru.usedSize -= entry.size - if lru.onEvicted != nil { - lru.onEvicted(entry.key, entry) - } -} - -// Get returns a cache entry with key key. -func (lru *LRU) Get(key Key) *Entry { - elem, ok := lru.cache[key] - if !ok { - return nil - } - lru.ll.MoveToFront(elem) - return elem.Value.(*Entry) -} diff --git a/balancer/rls/internal/cache/cache_test.go b/balancer/rls/internal/cache/cache_test.go deleted file mode 100644 index 7c480b64621..00000000000 --- a/balancer/rls/internal/cache/cache_test.go +++ /dev/null @@ -1,262 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package cache - -import ( - "sync" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" -) - -const ( - defaultTestCacheSize = 5 - defaultTestCacheMaxSize = 1000000 - defaultTestTimeout = 1 * time.Second -) - -// TestGet verifies the Add and Get methods of cache.LRU. -func TestGet(t *testing.T) { - key1 := Key{Path: "/service1/method1", KeyMap: "k1=v1,k2=v2"} - key2 := Key{Path: "/service2/method2", KeyMap: "k1=v1,k2=v2"} - val1 := Entry{HeaderData: "h1=v1"} - val2 := Entry{HeaderData: "h2=v2"} - - tests := []struct { - desc string - keysToAdd []Key - valsToAdd []*Entry - keyToGet Key - wantEntry *Entry - }{ - { - desc: "Empty cache", - keyToGet: Key{}, - }, - { - desc: "Single entry miss", - keysToAdd: []Key{key1}, - valsToAdd: []*Entry{&val1}, - keyToGet: Key{}, - }, - { - desc: "Single entry hit", - keysToAdd: []Key{key1}, - valsToAdd: []*Entry{&val1}, - keyToGet: key1, - wantEntry: &val1, - }, - { - desc: "Multi entry miss", - keysToAdd: []Key{key1, key2}, - valsToAdd: []*Entry{&val1, &val2}, - keyToGet: Key{}, - }, - { - desc: "Multi entry hit", - keysToAdd: []Key{key1, key2}, - valsToAdd: []*Entry{&val1, &val2}, - keyToGet: key1, - wantEntry: &val1, - }, - } - - for _, test := range tests { - t.Run(test.desc, func(t *testing.T) { - lru := NewLRU(defaultTestCacheMaxSize, nil) - for i, key := range test.keysToAdd { - lru.Add(key, test.valsToAdd[i]) - } - opts := []cmp.Option{ - cmpopts.IgnoreInterfaces(struct{ sync.Locker }{}), - cmpopts.IgnoreUnexported(Entry{}), - } - if gotEntry := lru.Get(test.keyToGet); !cmp.Equal(gotEntry, test.wantEntry, opts...) { - t.Errorf("lru.Get(%+v) = %+v, want %+v", test.keyToGet, gotEntry, test.wantEntry) - } - }) - } -} - -// TestRemove verifies the Add and Remove methods of cache.LRU. -func TestRemove(t *testing.T) { - keys := []Key{ - {Path: "/service1/method1", KeyMap: "k1=v1,k2=v2"}, - {Path: "/service2/method2", KeyMap: "k1=v1,k2=v2"}, - {Path: "/service3/method3", KeyMap: "k1=v1,k2=v2"}, - } - - lru := NewLRU(defaultTestCacheMaxSize, nil) - for _, k := range keys { - lru.Add(k, &Entry{}) - } - for _, k := range keys { - lru.Remove(k) - if entry := lru.Get(k); entry != nil { - t.Fatalf("lru.Get(%+v) after a call to lru.Remove succeeds, should have failed", k) - } - } -} - -// TestExceedingSizeCausesEviction verifies the case where adding a new entry -// to the cache leads to eviction of old entries to make space for the new one. -func TestExceedingSizeCausesEviction(t *testing.T) { - evictCh := make(chan Key, defaultTestCacheSize) - onEvicted := func(k Key, _ *Entry) { - t.Logf("evicted key {%+v} from cache", k) - evictCh <- k - } - - keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}} - keysCausingEviction := []Key{{Path: "f"}, {Path: "g"}, {Path: "h"}, {Path: "i"}, {Path: "j"}} - - lru := NewLRU(defaultTestCacheSize, onEvicted) - for _, key := range keysToFill { - lru.Add(key, &Entry{}) - } - - for i, key := range keysCausingEviction { - lru.Add(key, &Entry{}) - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Test timeout waiting for eviction") - case k := <-evictCh: - timer.Stop() - if !cmp.Equal(k, keysToFill[i]) { - t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i]) - } - } - } -} - -// TestAddCausesMultipleEvictions verifies the case where adding one new entry -// causes the eviction of multiple old entries to make space for the new one. -func TestAddCausesMultipleEvictions(t *testing.T) { - evictCh := make(chan Key, defaultTestCacheSize) - onEvicted := func(k Key, _ *Entry) { - evictCh <- k - } - - keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}} - keyCausingEviction := Key{Path: "abcde"} - - lru := NewLRU(defaultTestCacheSize, onEvicted) - for _, key := range keysToFill { - lru.Add(key, &Entry{}) - } - - lru.Add(keyCausingEviction, &Entry{}) - - for i := range keysToFill { - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Test timeout waiting for eviction") - case k := <-evictCh: - timer.Stop() - if !cmp.Equal(k, keysToFill[i]) { - t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i]) - } - } - } -} - -// TestModifyCausesMultipleEvictions verifies the case where mofiying an -// existing entry to increase its size leads to the eviction of older entries -// to make space for the new one. -func TestModifyCausesMultipleEvictions(t *testing.T) { - evictCh := make(chan Key, defaultTestCacheSize) - onEvicted := func(k Key, _ *Entry) { - evictCh <- k - } - - keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}} - lru := NewLRU(defaultTestCacheSize, onEvicted) - for _, key := range keysToFill { - lru.Add(key, &Entry{}) - } - - lru.Add(keysToFill[len(keysToFill)-1], &Entry{HeaderData: "xxxx"}) - for i := range keysToFill[:len(keysToFill)-1] { - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Test timeout waiting for eviction") - case k := <-evictCh: - timer.Stop() - if !cmp.Equal(k, keysToFill[i]) { - t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i]) - } - } - } -} - -func TestLRUResize(t *testing.T) { - tests := []struct { - desc string - maxSize int64 - keysToFill []Key - newMaxSize int64 - wantEvictedKeys []Key - }{ - { - desc: "resize causes multiple evictions", - maxSize: 5, - keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}, - newMaxSize: 3, - wantEvictedKeys: []Key{{Path: "a"}, {Path: "b"}}, - }, - { - desc: "resize causes no evictions", - maxSize: 50, - keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}, - newMaxSize: 10, - wantEvictedKeys: []Key{}, - }, - { - desc: "resize to higher value", - maxSize: 5, - keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}, - newMaxSize: 10, - wantEvictedKeys: []Key{}, - }, - } - - for _, test := range tests { - t.Run(test.desc, func(t *testing.T) { - var evictedKeys []Key - onEvicted := func(k Key, _ *Entry) { - evictedKeys = append(evictedKeys, k) - } - - lru := NewLRU(test.maxSize, onEvicted) - for _, key := range test.keysToFill { - lru.Add(key, &Entry{}) - } - lru.Resize(test.newMaxSize) - if !cmp.Equal(evictedKeys, test.wantEvictedKeys, cmpopts.EquateEmpty()) { - t.Fatalf("lru.Resize evicted keys {%v}, should have evicted {%v}", evictedKeys, test.wantEvictedKeys) - } - }) - } -} diff --git a/balancer/rls/internal/cache_test.go b/balancer/rls/internal/cache_test.go new file mode 100644 index 00000000000..cb9b060b59a --- /dev/null +++ b/balancer/rls/internal/cache_test.go @@ -0,0 +1,276 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rls + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/internal/backoff" +) + +var ( + cacheKeys = []cacheKey{ + {path: "0", keys: "a"}, + {path: "1", keys: "b"}, + {path: "2", keys: "c"}, + {path: "3", keys: "d"}, + {path: "4", keys: "e"}, + } + + longDuration = 10 * time.Minute + shortDuration = 1 * time.Millisecond + cacheEntries []*cacheEntry +) + +func initCacheEntries() { + // All entries have a dummy size of 1 to simplify resize operations. + cacheEntries = []*cacheEntry{ + { + // Entry is valid and minimum expiry time has not expired. + expiryTime: time.Now().Add(longDuration), + earliestEvictTime: time.Now().Add(longDuration), + size: 1, + }, + { + // Entry is valid and is in backoff. + expiryTime: time.Now().Add(longDuration), + backoffTime: time.Now().Add(longDuration), + backoffState: &backoffState{timer: time.NewTimer(longDuration)}, + size: 1, + }, + { + // Entry is valid, and not in backoff. + expiryTime: time.Now().Add(longDuration), + size: 1, + }, + { + // Entry is invalid. + expiryTime: time.Time{}.Add(shortDuration), + size: 1, + }, + { + // Entry is invalid valid and backoff has expired. + expiryTime: time.Time{}.Add(shortDuration), + backoffExpiryTime: time.Time{}.Add(shortDuration), + size: 1, + }, + } +} + +func (s) TestLRU_BasicOperations(t *testing.T) { + initCacheEntries() + // Create an LRU and add some entries to it. + lru := newLRU() + for _, k := range cacheKeys { + lru.addEntry(k) + } + + // Get the least recent entry. This should be the first entry we added. + if got, want := lru.getLeastRecentlyUsed(), cacheKeys[0]; got != want { + t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want) + } + + // Iterate through the slice of keys we added earlier, making them the most + // recent entry, one at a time. The least recent entry at that point should + // be the next entry from our slice of keys. + for i, k := range cacheKeys { + lru.makeRecent(k) + + lruIndex := (i + 1) % len(cacheKeys) + if got, want := lru.getLeastRecentlyUsed(), cacheKeys[lruIndex]; got != want { + t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want) + } + } + + // Iterate through the slice of keys we added earlier, removing them one at + // a time The least recent entry at that point should be the next entry from + // our slice of keys, except for the last one because the lru will be empty. + for i, k := range cacheKeys { + lru.removeEntry(k) + + var want cacheKey + if i < len(cacheKeys)-1 { + want = cacheKeys[i+1] + } + if got := lru.getLeastRecentlyUsed(); got != want { + t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want) + } + } +} + +func (s) TestLRU_IterateAndRun(t *testing.T) { + initCacheEntries() + // Create an LRU and add some entries to it. + lru := newLRU() + for _, k := range cacheKeys { + lru.addEntry(k) + } + + // Iterate through the lru to make sure that entries are returned in the + // least recently used order. + var gotKeys []cacheKey + lru.iterateAndRun(func(key cacheKey) { + gotKeys = append(gotKeys, key) + }) + if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) { + t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys) + } + + // Make sure that removing entries from the lru while iterating through it + // is a safe operation. + lru.iterateAndRun(func(key cacheKey) { + lru.removeEntry(key) + }) + + // Check the lru internals to make sure we freed up all the memory. + if len := lru.ll.Len(); len != 0 { + t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len) + } + if len := len(lru.m); len != 0 { + t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len) + } +} + +func (s) TestDataCache_BasicOperations(t *testing.T) { + initCacheEntries() + dc := newDataCache(5, nil) + for i, k := range cacheKeys { + dc.addEntry(k, cacheEntries[i]) + } + for i, k := range cacheKeys { + entry := dc.getEntry(k) + if !cmp.Equal(entry, cacheEntries[i], cmp.AllowUnexported(cacheEntry{}, backoffState{}), cmpopts.IgnoreUnexported(time.Timer{})) { + t.Fatalf("Data cache lookup for key %v returned entry %v, want %v", k, entry, cacheEntries[i]) + } + } +} + +func (s) TestDataCache_AddForcesResize(t *testing.T) { + initCacheEntries() + dc := newDataCache(1, nil) + + // The first entry in cacheEntries has a minimum expiry time in the future. + // This entry would stop the resize operation since we do not evict entries + // whose minimum expiration time is in the future. So, we do not use that + // entry in this test. The entry being added has a running backoff timer. + evicted, ok := dc.addEntry(cacheKeys[1], cacheEntries[1]) + if evicted || !ok { + t.Fatalf("dataCache.addEntry() returned (%v, %v) want (false, true)", evicted, ok) + } + + // Add another entry leading to the eviction of the above entry which has a + // running backoff timer. The first return value is expected to be true. + backoffCancelled, ok := dc.addEntry(cacheKeys[2], cacheEntries[2]) + if !backoffCancelled || !ok { + t.Fatalf("dataCache.addEntry() returned (%v, %v) want (true, true)", backoffCancelled, ok) + } + + // Add another entry leading to the eviction of the above entry which does not + // have a running backoff timer. This should evict the above entry, but the + // first return value is expected to be false. + backoffCancelled, ok = dc.addEntry(cacheKeys[3], cacheEntries[3]) + if backoffCancelled || !ok { + t.Fatalf("dataCache.addEntry() returned (%v, %v) want (false, true)", backoffCancelled, ok) + } +} + +func (s) TestDataCache_Resize(t *testing.T) { + initCacheEntries() + dc := newDataCache(5, nil) + for i, k := range cacheKeys { + dc.addEntry(k, cacheEntries[i]) + } + + // The first cache entry (with a key of cacheKeys[0]) that we added has an + // earliestEvictTime in the future. As part of the resize operation, we + // traverse the cache in least recently used order, and this will be first + // entry that we will encounter. And since the earliestEvictTime is in the + // future, the resize operation will stop, leaving the cache bigger than + // what was asked for. + if dc.resize(1) { + t.Fatalf("dataCache.resize() returned true, want false") + } + if dc.currentSize != 5 { + t.Fatalf("dataCache.size is %d, want 5", dc.currentSize) + } + + // Remove the entry with earliestEvictTime in the future and retry the + // resize operation. + dc.removeEntryForTesting(cacheKeys[0]) + if !dc.resize(1) { + t.Fatalf("dataCache.resize() returned false, want true") + } + if dc.currentSize != 1 { + t.Fatalf("dataCache.size is %d, want 1", dc.currentSize) + } +} + +func (s) TestDataCache_EvictExpiredEntries(t *testing.T) { + initCacheEntries() + dc := newDataCache(5, nil) + for i, k := range cacheKeys { + dc.addEntry(k, cacheEntries[i]) + } + + // The last two entries in the cacheEntries list have expired, and will be + // evicted. The first three should still remain in the cache. + if !dc.evictExpiredEntries() { + t.Fatal("dataCache.evictExpiredEntries() returned false, want true") + } + if dc.currentSize != 3 { + t.Fatalf("dataCache.size is %d, want 3", dc.currentSize) + } + for i := 0; i < 3; i++ { + entry := dc.getEntry(cacheKeys[i]) + if !cmp.Equal(entry, cacheEntries[i], cmp.AllowUnexported(cacheEntry{}, backoffState{}), cmpopts.IgnoreUnexported(time.Timer{})) { + t.Fatalf("Data cache lookup for key %v returned entry %v, want %v", cacheKeys[i], entry, cacheEntries[i]) + } + } +} + +func (s) TestDataCache_ResetBackoffState(t *testing.T) { + type fakeBackoff struct { + backoff.Strategy + } + + initCacheEntries() + dc := newDataCache(5, nil) + for i, k := range cacheKeys { + dc.addEntry(k, cacheEntries[i]) + } + + newBackoffState := &backoffState{bs: &fakeBackoff{}} + if updatePicker := dc.resetBackoffState(newBackoffState); !updatePicker { + t.Fatal("dataCache.resetBackoffState() returned updatePicker is false, want true") + } + + // Make sure that the entry with no backoff state was not touched. + if entry := dc.getEntry(cacheKeys[0]); cmp.Equal(entry.backoffState, newBackoffState, cmp.AllowUnexported(backoffState{})) { + t.Fatal("dataCache.resetBackoffState() touched entries without a valid backoffState") + } + + // Make sure that the entry with a valid backoff state was reset. + entry := dc.getEntry(cacheKeys[1]) + if diff := cmp.Diff(entry.backoffState, newBackoffState, cmp.AllowUnexported(backoffState{})); diff != "" { + t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff) + } +} diff --git a/balancer/rls/internal/child_policy.go b/balancer/rls/internal/child_policy.go new file mode 100644 index 00000000000..2e25be6438e --- /dev/null +++ b/balancer/rls/internal/child_policy.go @@ -0,0 +1,112 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rls + +import ( + "fmt" + "sync/atomic" + "unsafe" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +// TODO(easwars): Remove this once all RLS code is merged. +//lint:file-ignore U1000 Ignore all unused code, not all code is merged yet. + +// childPolicyWrapper is a reference counted wrapper around a child policy. +// +// The LB policy maintains a map of these wrappers keyed by the target returned +// by RLS. When a target is seen for the first time, a child policy wrapper is +// created for it and the wrapper is added to the child policy map. Each entry +// in the data cache holds references to the corresponding child policy +// wrappers. The LB policy also holds a reference to the child policy wrapper +// for the default target specified in the LB Policy Configuration +// +// When a cache entry is evicted, it releases references to the child policy +// wrappers that it contains. When all references have been released, the +// wrapper is removed from the child policy map and is destroyed. +// +// The child policy wrapper also caches the connectivity state and most recent +// picker from the child policy. Once the child policy wrapper reports +// TRANSIENT_FAILURE, it will continue reporting that state until it goes READY; +// transitions from TRANSIENT_FAILURE to CONNECTING are ignored. +// +// Whenever a child policy wrapper changes its connectivity state, the LB policy +// returns a new picker to the channel, since the channel may need to re-process +// the picks for queued RPCs. +// +// It is not safe for concurrent access. +type childPolicyWrapper struct { + logger *internalgrpclog.PrefixLogger + target string // RLS target corresponding to this child policy. + refCnt int // Reference count. + + // Balancer state reported by the child policy. The RLS LB policy maintains + // these child policies in a BalancerGroup. The state reported by the child + // policy is pushed to the state aggregator (which is also implemented by the + // RLS LB policy) and cached here. See handleChildPolicyStateUpdate() for + // details on how the state aggregation is performed. + // + // While this field is written to by the LB policy, it is read by the picker + // at Pick time. Making this an atomic to enable the picker to read this value + // without a mutex. + state unsafe.Pointer // *balancer.State +} + +// newChildPolicyWrapper creates a child policy wrapper for the given target, +// and is initialized with one reference and starts off in CONNECTING state. +func newChildPolicyWrapper(target string) *childPolicyWrapper { + c := &childPolicyWrapper{ + target: target, + refCnt: 1, + state: unsafe.Pointer(&balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), + }), + } + c.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-child-policy-wrapper %s %p] ", c.target, c)) + c.logger.Infof("Created") + return c +} + +// acquireRef increments the reference count on the child policy wrapper. +func (c *childPolicyWrapper) acquireRef() { + c.refCnt++ +} + +// releaseRef decrements the reference count on the child policy wrapper. The +// return value indicates whether the released reference was the last one. +func (c *childPolicyWrapper) releaseRef() bool { + c.refCnt-- + return c.refCnt == 0 +} + +// lamify causes the child policy wrapper to return a picker which will always +// fail requests. This is used when the wrapper runs into errors when trying to +// build and parse the child policy configuration. +func (c *childPolicyWrapper) lamify(err error) { + c.logger.Warningf("Entering lame mode: %v", err) + atomic.StorePointer(&c.state, unsafe.Pointer(&balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(err), + })) +}