diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 77996e2f22a9..536378006abf 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -210,29 +210,82 @@ func TestAdminRelocateRange(t *testing.T) { // Regression test for https://github.com/cockroachdb/cockroach/issues/64325 // which makes sure an in-flight read operation during replica removal won't // return empty results. -func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { +func TestReplicaRemovalDuringGet(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed read during replica removal. + resp, pErr := evalDuringReplicaRemoval(ctx, getArgs(key)) + require.Nil(t, pErr) + require.NotNil(t, resp) + require.NotNil(t, resp.(*roachpb.GetResponse).Value) + val, err := resp.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, []byte("foo"), val) +} + +// Regression test for https://github.com/cockroachdb/cockroach/issues/46329 +// which makes sure an in-flight conditional put operation during replica +// removal won't spuriously error due to an unexpectedly missing value. +func TestReplicaRemovalDuringCPut(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, key, evalDuringReplicaRemoval := setupReplicaRemovalTest(t, ctx) + defer tc.Stopper().Stop(ctx) + + // Perform write. + pArgs := putArgs(key, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) + require.Nil(t, pErr) + + // Perform delayed conditional put during replica removal. This will cause + // an ambiguous result error, as outstanding proposals in the leaseholder + // replica's proposal queue will be aborted when the replica is removed. + // If the replica was removed from under us, it would instead return a + // ConditionFailedError since it finds nil in place of "foo". + req := cPutArgs(key, []byte("bar"), []byte("foo")) + _, pErr = evalDuringReplicaRemoval(ctx, req) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.AmbiguousResultError{}, pErr.GetDetail()) +} + +// setupReplicaRemovalTest sets up a test cluster that can be used to test +// request evaluation during replica removal. It returns a running test +// cluster, the first key of a blank scratch range on the replica to be +// removed, and a function that can execute a delayed request just as the +// replica is being removed. +func setupReplicaRemovalTest( + t *testing.T, ctx context.Context, +) ( + *testcluster.TestCluster, + roachpb.Key, + func(context.Context, roachpb.Request) (roachpb.Response, *roachpb.Error), +) { + t.Helper() + type magicKey struct{} - // delayReadC is used to synchronize the in-flight read request with the main - // test goroutine. It is read from twice: - // - // 1. The first read allows the test to block until the request eval filter - // is called, i.e. when the read request is ready. - // 2. The second read allows the test to close the channel to unblock - // the eval filter, causing the read request to be evaluated. - delayReadC := make(chan struct{}) + requestReadyC := make(chan struct{}) // signals main thread that request is teed up + requestEvalC := make(chan struct{}) // signals cluster to evaluate the request evalFilter := func(args storagebase.FilterArgs) *roachpb.Error { if args.Ctx.Value(magicKey{}) != nil { - <-delayReadC - <-delayReadC + requestReadyC <- struct{}{} + <-requestEvalC } return nil } - ctx := context.Background() manual := hlc.NewHybridManualClock() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, @@ -250,64 +303,59 @@ func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) { }, } tc := testcluster.StartTestCluster(t, 2, args) - defer tc.Stopper().Stop(ctx) // Create range and upreplicate. key := tc.ScratchRange(t) tc.AddReplicasOrFatal(t, key, tc.Target(1)) - // Perform write. - pArgs := putArgs(key, []byte("foo")) - _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs) - require.Nil(t, pErr) + // Return a function that can be used to evaluate a delayed request + // during replica removal. + evalDuringReplicaRemoval := func(ctx context.Context, req roachpb.Request) (roachpb.Response, *roachpb.Error) { + // Submit request and wait for it to block. + type result struct { + resp roachpb.Response + err *roachpb.Error + } + resultC := make(chan result) + err := tc.Stopper().RunAsyncTask(ctx, "request", func(ctx context.Context) { + reqCtx := context.WithValue(ctx, magicKey{}, struct{}{}) + resp, pErr := kv.SendWrapped(reqCtx, tc.Servers[0].DistSender(), req) + resultC <- result{resp, pErr} + }) + require.NoError(t, err) + <-requestReadyC - // Perform read on write and wait for read to block. - type reply struct { - resp roachpb.Response - err *roachpb.Error - } - readResC := make(chan reply) - err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) { - readCtx := context.WithValue(ctx, magicKey{}, struct{}{}) - gArgs := getArgs(key) - resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs) - readResC <- reply{resp, pErr} - }) - require.NoError(t, err) - delayReadC <- struct{}{} + // Transfer leaseholder to other store. + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + repl, err := store.GetReplica(rangeDesc.RangeID) + require.NoError(t, err) + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + require.NoError(t, err) - // Transfer leaseholder to other store. - rangeDesc, err := tc.LookupRange(key) - require.NoError(t, err) - store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(tc.Server(0).GetFirstStoreID()) - require.NoError(t, err) - repl, err := store.GetReplica(rangeDesc.RangeID) - require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) - require.NoError(t, err) + // Remove first store from raft group. + tc.RemoveReplicasOrFatal(t, key, tc.Target(0)) - // Remove first store from raft group. - tc.RemoveReplicasOrFatal(t, key, tc.Target(0)) - - // This is a bit iffy. We want to make sure that, in the buggy case, we - // will typically fail (i.e. the read returns empty because the replica was - // removed). However, in the non-buggy case the in-flight read request will - // be holding readOnlyCmdMu until evaluated, blocking the replica removal, - // so waiting for replica removal would deadlock. We therefore take the - // easy way out by starting an async replica GC and sleeping for a bit. - err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { - assert.NoError(t, store.ManualReplicaGC(repl)) - }) - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - - // Allow read to resume. Should return "foo". - close(delayReadC) - r := <-readResC - require.Nil(t, r.err) - require.NotNil(t, r.resp) - require.NotNil(t, r.resp.(*roachpb.GetResponse).Value) - val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes() - require.NoError(t, err) - require.Equal(t, []byte("foo"), val) + // Wait for replica removal. This is a bit iffy. We want to make sure + // that, in the buggy case, we will typically fail (i.e. the request + // returns incorrect results because the replica was removed). However, + // in the non-buggy case the in-flight request will be holding + // readOnlyCmdMu until evaluated, blocking the replica removal, so + // waiting for replica removal would deadlock. We therefore take the + // easy way out by starting an async replica GC and sleeping for a bit. + err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) { + assert.NoError(t, store.ManualReplicaGC(repl)) + }) + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + + // Allow request to resume, and return the result. + close(requestEvalC) + r := <-resultC + return r.resp, r.err + } + + return tc, key, evalDuringReplicaRemoval } diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 22ec45356ca7..1bceca0a1a0d 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1481,6 +1481,24 @@ func putArgs(key roachpb.Key, value []byte) *roachpb.PutRequest { } } +// cPutArgs returns a ConditionPutRequest to the default replica +// for the specified key and value, with the given expected value. +func cPutArgs(key roachpb.Key, value, expValue []byte) *roachpb.ConditionalPutRequest { + var ev *roachpb.Value + if expValue != nil { + value := roachpb.MakeValueFromBytes(expValue) + ev = &value + } + + return &roachpb.ConditionalPutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Value: roachpb.MakeValueFromBytes(value), + ExpValue: ev, + } +} + // incrementArgs returns an IncrementRequest addressed to the default replica // for the specified key. func incrementArgs(key roachpb.Key, inc int64) *roachpb.IncrementRequest { diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 7cdf7a048b45..1b30b01e65e6 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -21,12 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// maybeSetCorrupt is a stand-in for proper handling of failing replicas. Such a -// failure is indicated by a call to maybeSetCorrupt with a ReplicaCorruptionError. -// Currently any error is passed through, but prospectively it should stop the -// range from participating in progress, trigger a rebalance operation and -// decide on an error-by-error basis whether the corruption is limited to the -// range, store, node or cluster with corresponding actions taken. +// setCorruptRaftMuLocked is a stand-in for proper handling of failing replicas. +// Such a failure is indicated by a call to setCorruptRaftMuLocked with a +// ReplicaCorruptionError. Currently any error is passed through, but +// prospectively it should stop the range from participating in progress, +// trigger a rebalance operation and decide on an error-by-error basis whether +// the corruption is limited to the range, store, node or cluster with +// corresponding actions taken. // // Despite the fatal log call below this message we still return for the // sake of testing. @@ -37,15 +38,6 @@ import ( // best bet is to not have any of those. // @bdarnell remarks: Corruption errors should be rare so we may want the store // to just recompute its stats in the background when one occurs. -func (r *Replica) maybeSetCorrupt(ctx context.Context, pErr *roachpb.Error) *roachpb.Error { - if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { - r.raftMu.Lock() - defer r.raftMu.Unlock() - return r.setCorruptRaftMuLocked(ctx, cErr) - } - return pErr -} - func (r *Replica) setCorruptRaftMuLocked( ctx context.Context, cErr *roachpb.ReplicaCorruptionError, ) *roachpb.Error { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index b4f24469f653..ec9b907d1030 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -721,8 +721,13 @@ func (r *Replica) evaluateProposal( } // Evaluate the commands. If this returns without an error, the batch should - // be committed. Note that we don't hold any locks at this point. This is - // important since evaluating a proposal is expensive. + // be committed. Note that we don't hold any locks at this point, except a + // shared RLock on readOnlyCmdMu. This is important since evaluating a + // proposal is expensive. + // + // Note that, during evaluation, ba's read and write timestamps might get + // bumped (see evaluateWriteBatchWithServersideRefreshes). + // // TODO(tschottdorf): absorb all returned values in `res` below this point // in the call stack as well. batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, latchSpans) @@ -735,7 +740,9 @@ func (r *Replica) evaluateProposal( } if pErr != nil { - pErr = r.maybeSetCorrupt(ctx, pErr) + if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return &res, false /* needConsensus */, pErr + } txn := pErr.GetTxn() if txn != nil && ba.Txn == nil { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index b18c98553b88..860aa0cead23 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -57,6 +57,10 @@ func makeIDKey() storagebase.CmdIDKey { // caller should relinquish all ownership of it. If it does return an error, the // caller retains full ownership over the guard. // +// Nothing here or below can take out a raftMu lock, since executeWriteBatch() +// is already holding readOnlyCmdMu when calling this. Locking raftMu after it +// would violate the locking order specified for Store.mu. +// // Return values: // - a channel which receives a response or error upon application // - a closure used to attempt to abandon the command. When called, it unbinds @@ -77,6 +81,8 @@ func (r *Replica) evalAndPropose( // proagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { return nil, nil, 0, pErr + } else if _, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d177de7f15f0..4e845b72129f 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -3168,6 +3168,9 @@ func TestReplicaNoTimestampIncrementWithinTxn(t *testing.T) { // TestReplicaAbortSpanReadError verifies that an error is returned // to the client in the event that a AbortSpan entry is found but is // not decodable. +// +// This doubles as a test that replica corruption errors are propagated +// and handled correctly. func TestReplicaAbortSpanReadError(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index d64e38ae5f59..cd2bd1d195c1 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -67,11 +67,12 @@ func (r *Replica) executeWriteBatch( ) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // TODO(nvanbenschoten): unlike on the read-path (executeReadOnlyBatch), we - // don't synchronize with r.readOnlyCmdMu here. Is that ok? What if the - // replica is destroyed concurrently with a write? We won't be able to - // successfully propose as the lease will presumably have changed, but what - // if we hit an error during evaluation (e.g. a ConditionFailedError)? + // Even though we're not a read-only operation by definition, we have to + // take out a read lock on readOnlyCmdMu while performing any reads during + // pre-Raft evaluation (e.g. conditional puts), otherwise we can race with + // replica removal and get evaluated on an empty replica. We must release + // this lock before Raft execution, to avoid deadlocks. + r.readOnlyCmdMu.RLock() // Verify that the batch can be executed. // NB: we only need to check that the request is in the Range's key bounds @@ -79,6 +80,7 @@ func (r *Replica) executeWriteBatch( // will synchronize all requests (notably EndTxn with SplitTrigger) that may // cause this condition to change. if err := r.checkExecutionCanProceed(ba, g, &st); err != nil { + r.readOnlyCmdMu.RUnlock() return nil, g, roachpb.NewError(err) } @@ -110,6 +112,7 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { + r.readOnlyCmdMu.RUnlock() log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) } @@ -121,6 +124,7 @@ func (r *Replica) executeWriteBatch( // other transactions to be released while sequencing in the concurrency // manager. if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence { + r.readOnlyCmdMu.RUnlock() curLeaseCpy := curLease // avoid letting curLease escape err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc()) log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) @@ -132,6 +136,13 @@ func (r *Replica) executeWriteBatch( // evalAndPropose. ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease) if pErr != nil { + r.readOnlyCmdMu.RUnlock() + if cErr, ok := pErr.GetDetail().(*roachpb.ReplicaCorruptionError); ok { + r.raftMu.Lock() + defer r.raftMu.Unlock() + // This exits with a fatal error, but returns in tests. + return nil, g, r.setCorruptRaftMuLocked(ctx, cErr) + } if maxLeaseIndex != 0 { log.Fatalf( ctx, "unexpected max lease index %d assigned to failed proposal: %s, error %s", @@ -150,6 +161,10 @@ func (r *Replica) executeWriteBatch( untrack(ctx, ctpb.Epoch(st.Lease.Epoch), r.RangeID, ctpb.LAI(maxLeaseIndex)) } + // We are done with pre-Raft evaluation at this point, and have to release the + // read-only command lock to avoid deadlocks during Raft evaluation. + r.readOnlyCmdMu.RUnlock() + // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce()