diff --git a/spanner/client.go b/spanner/client.go index 210cb6c20f9..1f15cb01e2d 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -26,7 +26,6 @@ import ( "cloud.google.com/go/internal/trace" vkit "cloud.google.com/go/spanner/apiv1" - "cloud.google.com/go/spanner/internal/backoff" "google.golang.org/api/option" sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc" @@ -410,43 +409,6 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex return ts, err } -func runWithRetryOnAborted(ctx context.Context, f func(context.Context) error) error { - var funcErr error - retryCount := 0 - for { - select { - case <-ctx.Done(): - // Do context check here so that even f() failed to do so (for - // example, gRPC implementation bug), the loop can still have a - // chance to exit as expected. - return errContextCanceled(ctx, funcErr) - default: - } - funcErr = f(ctx) - if funcErr == nil { - return nil - } - // Only retry on ABORTED. - if isAbortErr(funcErr) { - // Aborted, do exponential backoff and continue. - b, ok := extractRetryDelay(funcErr) - if !ok { - b = backoff.DefaultBackoff.Delay(retryCount) - } - trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", b) - select { - case <-ctx.Done(): - return errContextCanceled(ctx, funcErr) - case <-time.After(b): - } - retryCount++ - continue - } - // Error isn't ABORTED / no error, return immediately. - return funcErr - } -} - // applyOption controls the behavior of Client.Apply. type applyOption struct { // If atLeastOnce == true, Client.Apply will execute the mutations on Cloud diff --git a/spanner/retry.go b/spanner/retry.go index cc6400f621d..fa10ee5f019 100644 --- a/spanner/retry.go +++ b/spanner/retry.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "cloud.google.com/go/internal/trace" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/googleapis/gax-go/v2" @@ -41,6 +42,58 @@ var DefaultRetryBackoff = gax.Backoff{ Multiplier: 1.3, } +// spannerRetryer extends the generic gax Retryer, but also checks for any +// retry info returned by Cloud Spanner and uses that if present. +type spannerRetryer struct { + gax.Retryer +} + +// onCodes returns a spannerRetryer that will retry on the specified error +// codes. +func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer { + return &spannerRetryer{ + Retryer: gax.OnCodes(cc, bo), + } +} + +// Retry returns the retry delay returned by Cloud Spanner if that is present. +// Otherwise it returns the retry delay calculated by the generic gax Retryer. +func (r *spannerRetryer) Retry(err error) (time.Duration, bool) { + delay, shouldRetry := r.Retryer.Retry(err) + if !shouldRetry { + return 0, false + } + if serverDelay, hasServerDelay := extractRetryDelay(err); hasServerDelay { + delay = serverDelay + } + return delay, true +} + +// runWithRetryOnAborted executes the given function and retries it if it +// returns an Aborted error. The delay between retries is the delay returned +// by Cloud Spanner, and if none is returned, the calculated delay with a +// minimum of 10ms and maximum of 32s. +func runWithRetryOnAborted(ctx context.Context, f func(context.Context) error) error { + retryer := onCodes(DefaultRetryBackoff, codes.Aborted) + funcWithRetry := func(ctx context.Context) error { + for { + err := f(ctx) + if err == nil { + return nil + } + delay, shouldRetry := retryer.Retry(err) + if !shouldRetry { + return err + } + trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay) + if err := gax.Sleep(ctx, delay); err != nil { + return err + } + } + } + return funcWithRetry(ctx) +} + // isErrorClosing reports whether the error is generated by gRPC layer talking // to a closed server. func isErrorClosing(err error) bool { diff --git a/spanner/retry_test.go b/spanner/retry_test.go index 979f164e54c..27b8922bd36 100644 --- a/spanner/retry_test.go +++ b/spanner/retry_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/googleapis/gax-go/v2" edpb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -40,3 +41,23 @@ func TestRetryInfo(t *testing.T) { t.Errorf(" = <%t, %v>, want ", ok, gotDelay, time.Second) } } + +func TestRetryerRespectsServerDelay(t *testing.T) { + t.Parallel() + serverDelay := 50 * time.Millisecond + b, _ := proto.Marshal(&edpb.RetryInfo{ + RetryDelay: ptypes.DurationProto(serverDelay), + }) + trailers := map[string]string{ + retryInfoKey: string(b), + } + retryer := onCodes(gax.Backoff{}, codes.Aborted) + err := toSpannerErrorWithMetadata(spannerErrorf(codes.Aborted, "transaction was aborted"), metadata.New(trailers)) + maxSeenDelay, shouldRetry := retryer.Retry(err) + if !shouldRetry { + t.Fatalf("expected shouldRetry to be true") + } + if maxSeenDelay != serverDelay { + t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay) + } +}