Skip to content

Commit

Permalink
Ensure memcached batched requests handle context cancelation (#5314)
Browse files Browse the repository at this point in the history
* Ensure memcached batched requests handle context cancellation

Ensure that when the context used for Memcached GetMulti is cancelled,
getMultiBatched does not hang waiting for results that will never be
generated (since the batched requests will not run if the context has
been cancelled).

Fixes an issue introduced in #5301

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Lint fixes

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Code review changes: run batches unconditionally

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Apr 29, 2022
1 parent f944ae6 commit 61e547e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 8 deletions.
62 changes: 62 additions & 0 deletions pkg/cacheutil/cacheutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,73 @@
package cacheutil

import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/goleak"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestDoWithBatch(t *testing.T) {
tests := map[string]struct {
items []string
batchSize int
expectedBatches int
concurrency gate.Gate
}{
"no items": {
items: []string{},
batchSize: 2,
expectedBatches: 0,
concurrency: nil,
},

"fewer than batch size": {
items: []string{"key1"},
batchSize: 2,
expectedBatches: 1,
concurrency: nil,
},

"perfect sized for batch": {
items: []string{"key1", "key2", "key3", "key4"},
batchSize: 2,
expectedBatches: 2,
concurrency: nil,
},

"odd sized for batch": {
items: []string{"key1", "key2", "key3", "key4", "key5"},
batchSize: 2,
expectedBatches: 3,
concurrency: nil,
},

"odd sized with concurrency limit": {
items: []string{"key1", "key2", "key3", "key4", "key5"},
batchSize: 2,
expectedBatches: 3,
concurrency: gate.New(prometheus.NewPedanticRegistry(), 1),
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
actualBatches := atomic.Int64{}
_ = doWithBatch(context.Background(), len(testData.items), testData.batchSize, testData.concurrency, func(startIndex, endIndex int) error {
actualBatches.Inc()
return nil
})

testutil.Equals(t, int64(testData.expectedBatches), actualBatches.Load())
})
}
}
25 changes: 19 additions & 6 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) {
// Do not batch if the input keys are less than the max batch size.
if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
items, err := c.getMultiSingle(keys)
items, err := c.getMultiSingle(ctx, keys)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -470,12 +470,16 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
defer close(results)

// Ignore the error here since it can only be returned by our provided function which
// always returns nil.
_ = doWithBatch(ctx, len(keys), c.config.MaxGetMultiBatchSize, getMultiGate, func(startIndex, endIndex int) error {
// always returns nil. NOTE also we are using a background context here for the doWithBatch
// method. This is to ensure that it runs the expected number of batches _even if_ our
// context (`ctx`) is canceled since we expect a certain number of batches to be read
// from `results` below. The wrapped `getMultiSingle` method will still check our context
// and short-circuit if it has been canceled.
_ = doWithBatch(context.Background(), len(keys), c.config.MaxGetMultiBatchSize, getMultiGate, func(startIndex, endIndex int) error {
batchKeys := keys[startIndex:endIndex]

res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(batchKeys)
res.items, res.err = c.getMultiSingle(ctx, batchKeys)

results <- res
return nil
Expand All @@ -499,10 +503,19 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
return items, lastErr
}

func (c *memcachedClient) getMultiSingle(keys []string) (items map[string]*memcache.Item, err error) {
func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (items map[string]*memcache.Item, err error) {
start := time.Now()
c.operations.WithLabelValues(opGetMulti).Inc()
items, err = c.client.GetMulti(keys)

select {
case <-ctx.Done():
// Make sure our context hasn't been canceled before fetching cache items using
// cache client backend.
return nil, ctx.Err()
default:
items, err = c.client.GetMulti(keys)
}

if err != nil {
level.Debug(c.logger).Log("msg", "failed to get multiple items from memcached", "err", err)
c.trackError(opGetMulti, err)
Expand Down
51 changes: 49 additions & 2 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
testutil.Ok(t, backendMock.waitItems(2))

actual, err := client.getMultiSingle([]string{"key-1", "key-2"})
actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, []byte("value-2"), actual["key-2"].Value)
Expand All @@ -166,7 +166,7 @@ func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, backendMock.waitItems(1))

actual, err := client.getMultiSingle([]string{"key-1", "key-2"})
actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, (*memcache.Item)(nil), actual["key-2"])
Expand Down Expand Up @@ -466,3 +466,50 @@ func TestMultipleClientsCanUseSameRegistry(t *testing.T) {
testutil.Ok(t, err)
defer client2.Stop()
}

func TestMemcachedClient_GetMulti_ContextCancelled(t *testing.T) {
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.MaxGetMultiBatchSize = 2
config.MaxGetMultiConcurrency = 2

// Create a new context that will be used for our "blocking" backend so that we can
// actually stop it at the end of the test and not leak goroutines.
backendCtx, backendCancel := context.WithCancel(context.Background())
defer backendCancel()

selector := &MemcachedJumpHashSelector{}
backendMock := newMemcachedClientBlockingMock(backendCtx)

client, err := newMemcachedClient(log.NewNopLogger(), backendMock, selector, config, prometheus.NewPedanticRegistry(), "test")
testutil.Ok(t, err)
defer client.Stop()

// Immediately cancel the context that will be used for the GetMulti request. This will
// ensure that the method called by the batching logic (getMultiSingle) returns immediately
// instead of calling the underlying memcached client (which blocks forever in this test).
ctx, cancel := context.WithCancel(context.Background())
cancel()

items := client.GetMulti(ctx, []string{"key1", "key2", "key3", "key4"})
testutil.Equals(t, 0, len(items))
}

type memcachedClientBlockingMock struct {
ctx context.Context
}

func newMemcachedClientBlockingMock(ctx context.Context) *memcachedClientBlockingMock {
return &memcachedClientBlockingMock{ctx: ctx}
}

func (c *memcachedClientBlockingMock) GetMulti([]string) (map[string]*memcache.Item, error) {
// Block until this backend client is explicitly stopped so that we can ensure the memcached
// client won't be blocked waiting for results that will never be returned.
<-c.ctx.Done()
return nil, nil
}

func (c *memcachedClientBlockingMock) Set(*memcache.Item) error {
return nil
}

0 comments on commit 61e547e

Please sign in to comment.