Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): retry spanner transactions and mutations when RST_STREAM error #6699

Merged
merged 2 commits into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions spanner/retry.go
Expand Up @@ -80,13 +80,13 @@ func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
}

// runWithRetryOnAbortedOrSessionNotFound executes the given function and
// retries it if it returns an Aborted or Session not found error. The retry
// is delayed if the error was Aborted. The delay between retries is the delay
// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry
// is delayed if the error was Aborted or Internal error. The delay between retries is the delay
// returned by Cloud Spanner, or if none is returned, the calculated delay with
// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
// the error was Session not found.
func runWithRetryOnAbortedOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a couple of tests for the changes in this PR that verify that transactions are retried if the RST_STREAM error is returned, but not for any other random internal error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olavloite Test "TestClient_ApplyAtLeastOnce_NonRetryableInternalErrors" added,
Please take a look at the code again.
This could cause an infinite => This is not true. The following code only says that:

if it is an internal error and its error message is not in the list, then return false.
If it is an internal error and it error message in the list, then let it pass to the next check r.Retryer.Retry(err).

It early returns false for non-retryable internal errors before the real retry check.

If the user has specified to not retry internal errors, then r.Retryer.Retry(err) will return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that, thanks! The specific error messages were already there, we were just not using them anymore (and I guess you are now going to dig up that I was the one that actually removed it ;-) ).

retryer := onCodes(DefaultRetryBackoff, codes.Aborted)
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make it retry on any Internal error, also those that are not retryable. This could cause an infinite (or at least very long-lasting) loop if someone tries to execute a transaction that happens to hit a feature that returns an internal error.

funcWithRetry := func(ctx context.Context) error {
for {
err := f(ctx)
Expand Down
77 changes: 45 additions & 32 deletions spanner/transaction.go
Expand Up @@ -1387,51 +1387,64 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
ts time.Time
sh *sessionHandle
)
defer func() {
if sh != nil {
sh.recycle()
}
}()
mPb, err := mutationsProto(ms)
if err != nil {
// Malformed mutation found, just return the error.
return ts, err
}

// Retry-loop for aborted transactions.
// TODO: Replace with generic retryer.
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
return ts, err
// Make a retryer for Aborted and certain Internal errors.
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also could cause an infinite loop.

// Apply the mutation and retry if the commit is aborted.
applyMutationWithRetry := func(ctx context.Context) error {
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
return ToSpannerError(err)
}
}
defer sh.recycle()
}
res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
},
},
},
},
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
}
return toSpannerErrorWithCommitInfo(err, true)
} else if err == nil {
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
}
}
return ts, toSpannerErrorWithCommitInfo(err, true)
} else if err == nil {
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
delay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
return err
}
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
break
}
}
return ts, ToSpannerError(err)
return ts, applyMutationWithRetry(ctx)
}

// isAbortedErr returns true if the error indicates that an gRPC call is
Expand Down