Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.1: kvserver: synchronize replica removal with read-write requests #64604

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
176 changes: 112 additions & 64 deletions pkg/kv/kvserver/client_relocate_range_test.go
Expand Up @@ -210,29 +210,82 @@ func TestAdminRelocateRange(t *testing.T) {
// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
func TestReplicaRemovalDuringGet(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed read during replica removal.
resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key))
require.Nil(t, pErr)
require.NotNil(t, resp)
require.NotNil(t, resp.(*roachpb.GetResponse).Value)
val, err := resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/46329
// which makes sure an in-flight conditional put operation during replica
// removal won't spuriously error due to an unexpectedly missing value.
func TestReplicaRemovalDuringCPut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx)
defer tc.Stopper().Stop(ctx)

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform delayed conditional put during replica removal. This will cause
// an ambiguous result error, as outstanding proposals in the leaseholder
// replica's proposal queue will be aborted when the replica is removed.
// If the replica was removed from under us, it would instead return a
// ConditionFailedError since it finds nil in place of "foo".
req := cPutArgs(key, []byte("bar"), []byte("foo"))
_, pErr = evalDuringReplicaRemoval(ctx, req)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail())
}

// setupReplicaRemovalTest sets up a test cluster that can be used to test
// request evaluation during replica removal. It returns a running test
// cluster, the first key of a blank scratch range on the replica to be
// removed, and a function that can execute a delayed request just as the
// replica is being removed.
func setupReplicaRemovalTest(
t *testing.T, ctx context.Context,
) (
*testcluster.TestCluster,
roachpb.Key,
func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error),
) {
t.Helper()

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
requestReadyC := make(chan struct{}) // signals main thread that request is teed up
requestEvalC := make(chan struct{}) // signals cluster to evaluate the request
evalFilter := func(args storagebase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
requestReadyC <- struct{}{}
<-requestEvalC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
Expand All @@ -250,64 +303,59 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddReplicasOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)
// Return a function that can be used to evaluate a delayed request
// during replica removal.
evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) {
// Submit request and wait for it to block.
type result struct {
resp roachpb.Response
err *roachpb.Error
}
resultC := make(chan result)
err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) {
reqCtx := context.WithValue(ctx, magicKey{}, struct{}{})
resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req)
resultC <- result{resp, pErr}
})
require.NoError(t, err)
<-requestReadyC

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}
// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)
repl, err := store.GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)
repl, err := store.GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)
// Remove first store from raft group.
tc.RemoveReplicasOrFatal(t, key, tc.Target(0))

// Remove first store from raft group.
tc.RemoveReplicasOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, store.ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
// Wait for replica removal. This is a bit iffy. We want to make sure
// that, in the buggy case, we will typically fail (i.e. the request
// returns incorrect results because the replica was removed). However,
// in the non-buggy case the in-flight request will be holding
// readOnlyCmdMu until evaluated, blocking the replica removal, so
// waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, store.ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow request to resume, and return the result.
close(requestEvalC)
r := <-resultC
return r.resp, r.err
}

return tc, key, evalDuringReplicaRemoval
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/client_test.go
Expand Up @@ -1481,6 +1481,24 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest {
}
}

// cPutArgs returns a ConditionPutRequest to the default replica
// for the specified key and value, with the given expected value.
func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest {
var ev *roachpb.Value
if expValue != nil {
value := roachpb.MakeValueFromBytes(expValue)
ev = &value
}

return &roachpb.ConditionalPutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: roachpb.MakeValueFromBytes(value),
ExpValue: ev,
}
}

// incrementArgs returns an IncrementRequest addressed to the default replica
// for the specified key.
func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest {
Expand Down
22 changes: 7 additions & 15 deletions pkg/kv/kvserver/replica_corruption.go
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a
// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError.
// Currently any error is passed through, but prospectively it should stop the
// range from participating in progress, trigger a rebalance operation and
// decide on an error-by-error basis whether the corruption is limited to the
// range, store, node or cluster with corresponding actions taken.
// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas.
// Such a failure is indicated by a call to setCorruptRaftMuLocked with a
// ReplicaCorruptionError. Currently any error is passed through, but
// prospectively it should stop the range from participating in progress,
// trigger a rebalance operation and decide on an error-by-error basis whether
// the corruption is limited to the range, store, node or cluster with
// corresponding actions taken.
//
// Despite the fatal log call below this message we still return for the
// sake of testing.
Expand All @@ -37,15 +38,6 @@ import (
// best bet is to not have any of those.
// @bdarnell remarks: Corruption errors should be rare so we may want the store
// to just recompute its stats in the background when one occurs.
func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error {
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.setCorruptRaftMuLocked(ctx, cErr)
}
return pErr
}

func (r *Replica) setCorruptRaftMuLocked(
ctx context.Context, cErr *roachpb.ReplicaCorruptionError,
) *roachpb.Error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/replica_proposal.go
Expand Up @@ -721,8 +721,13 @@ func (r *Replica) evaluateProposal(
}

// Evaluate the commands. If this returns without an error, the batch should
// be committed. Note that we don't hold any locks at this point. This is
// important since evaluating a proposal is expensive.
// be committed. Note that we don't hold any locks at this point, except a
// shared RLock on readOnlyCmdMu. This is important since evaluating a
// proposal is expensive.
//
// Note that, during evaluation, ba's read and write timestamps might get
// bumped (see evaluateWriteBatchWithServersideRefreshes).
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, latchSpans)
Expand All @@ -735,7 +740,9 @@ func (r *Replica) evaluateProposal(
}

if pErr != nil {
pErr = r.maybeSetCorrupt(ctx, pErr)
if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return &res, false /* needConsensus */, pErr
}

txn := pErr.GetTxn()
if txn != nil && ba.Txn == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Expand Up @@ -57,6 +57,10 @@ func makeIDKey() storagebase.CmdIDKey {
// caller should relinquish all ownership of it. If it does return an error, the
// caller retains full ownership over the guard.
//
// Nothing here or below can take out a raftMu lock, since executeWriteBatch()
// is already holding readOnlyCmdMu when calling this. Locking raftMu after it
// would violate the locking order specified for Store.mu.
//
// Return values:
// - a channel which receives a response or error upon application
// - a closure used to attempt to abandon the command. When called, it unbinds
Expand All @@ -77,6 +81,8 @@ func (r *Replica) evalAndPropose(
// proagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
return nil, nil, 0, pErr
} else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
return nil, nil, 0, pErr
}

// Attach the endCmds to the proposal and assume responsibility for
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Expand Up @@ -3168,6 +3168,9 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) {
// TestReplicaAbortSpanReadError verifies that an error is returned
// to the client in the event that a AbortSpan entry is found but is
// not decodable.
//
// This doubles as a test that replica corruption errors are propagated
// and handled correctly.
func TestReplicaAbortSpanReadError(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
25 changes: 20 additions & 5 deletions pkg/kv/kvserver/replica_write.go
Expand Up @@ -67,18 +67,20 @@ func (r *Replica) executeWriteBatch(
) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) {
startTime := timeutil.Now()

// TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we
// don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the
// replica is destroyed concurrently with a write? We won't be able to
// successfully propose as the lease will presumably have changed, but what
// if we hit an error during evaluation (e.g. a ConditionFailedError)?
// Even though we're not a read-only operation by definition, we have to
// take out a read lock on readOnlyCmdMu while performing any reads during
// pre-Raft evaluation (e.g. conditional puts), otherwise we can race with
// replica removal and get evaluated on an empty replica. We must release
// this lock before Raft execution, to avoid deadlocks.
r.readOnlyCmdMu.RLock()

// Verify that the batch can be executed.
// NB: we only need to check that the request is in the Range's key bounds
// at proposal time, not at application time, because the spanlatch manager
// will synchronize all requests (notably EndTxn with SplitTrigger) that may
// cause this condition to change.
if err := r.checkExecutionCanProceed(ba, g, &st); err != nil {
r.readOnlyCmdMu.RUnlock()
return nil, g, roachpb.NewError(err)
}

Expand Down Expand Up @@ -110,6 +112,7 @@ func (r *Replica) executeWriteBatch(

// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
r.readOnlyCmdMu.RUnlock()
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing"))
}
Expand All @@ -121,6 +124,7 @@ func (r *Replica) executeWriteBatch(
// other transactions to be released while sequencing in the concurrency
// manager.
if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence {
r.readOnlyCmdMu.RUnlock()
curLeaseCpy := curLease // avoid letting curLease escape
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc())
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
Expand All @@ -132,6 +136,13 @@ func (r *Replica) executeWriteBatch(
// evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease)
if pErr != nil {
r.readOnlyCmdMu.RUnlock()
if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok {
r.raftMu.Lock()
defer r.raftMu.Unlock()
// This exits with a fatal error, but returns in tests.
return nil, g, r.setCorruptRaftMuLocked(ctx, cErr)
}
if maxLeaseIndex != 0 {
log.Fatalf(
ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s",
Expand All @@ -150,6 +161,10 @@ func (r *Replica) executeWriteBatch(
untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex))
}

// We are done with pre-Raft evaluation at this point, and have to release the
// read-only command lock to avoid deadlocks during Raft evaluation.
r.readOnlyCmdMu.RUnlock()

// If the command was accepted by raft, wait for the range to apply it.
ctxDone := ctx.Done()
shouldQuiesce := r.store.stopper.ShouldQuiesce()
Expand Down