Skip to content

Commit

Permalink
spanner: use a retry for Read/Write transactions
Browse files Browse the repository at this point in the history
Create a customized GAX retryer that checks for any retry
information that Cloud Spanner might have returned.
If Cloud Spanner did not return any retry information, a
default delay is calculated by GAX and used.

Updates #1418

Change-Id: I56b9ab0bc4f64ad68f19edb9d18c89c3af7995d3
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44172
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
  • Loading branch information
olavloite committed Sep 21, 2019
1 parent adba9e8 commit 5709d56
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 38 deletions.
38 changes: 0 additions & 38 deletions spanner/client.go
Expand Up @@ -26,7 +26,6 @@ import (

"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal/backoff"
"google.golang.org/api/option"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
Expand Down Expand Up @@ -410,43 +409,6 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
return ts, err
}

func runWithRetryOnAborted(ctx context.Context, f func(context.Context) error) error {
var funcErr error
retryCount := 0
for {
select {
case <-ctx.Done():
// Do context check here so that even f() failed to do so (for
// example, gRPC implementation bug), the loop can still have a
// chance to exit as expected.
return errContextCanceled(ctx, funcErr)
default:
}
funcErr = f(ctx)
if funcErr == nil {
return nil
}
// Only retry on ABORTED.
if isAbortErr(funcErr) {
// Aborted, do exponential backoff and continue.
b, ok := extractRetryDelay(funcErr)
if !ok {
b = backoff.DefaultBackoff.Delay(retryCount)
}
trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", b)
select {
case <-ctx.Done():
return errContextCanceled(ctx, funcErr)
case <-time.After(b):
}
retryCount++
continue
}
// Error isn't ABORTED / no error, return immediately.
return funcErr
}
}

// applyOption controls the behavior of Client.Apply.
type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
Expand Down
53 changes: 53 additions & 0 deletions spanner/retry.go
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
Expand All @@ -41,6 +42,58 @@ var DefaultRetryBackoff = gax.Backoff{
Multiplier: 1.3,
}

// spannerRetryer extends the generic gax Retryer, but also checks for any
// retry info returned by Cloud Spanner and uses that if present.
type spannerRetryer struct {
gax.Retryer
}

// onCodes returns a spannerRetryer that will retry on the specified error
// codes.
func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer {
return &spannerRetryer{
Retryer: gax.OnCodes(cc, bo),
}
}

// Retry returns the retry delay returned by Cloud Spanner if that is present.
// Otherwise it returns the retry delay calculated by the generic gax Retryer.
func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
delay, shouldRetry := r.Retryer.Retry(err)
if !shouldRetry {
return 0, false
}
if serverDelay, hasServerDelay := extractRetryDelay(err); hasServerDelay {
delay = serverDelay
}
return delay, true
}

// runWithRetryOnAborted executes the given function and retries it if it
// returns an Aborted error. The delay between retries is the delay returned
// by Cloud Spanner, and if none is returned, the calculated delay with a
// minimum of 10ms and maximum of 32s.
func runWithRetryOnAborted(ctx context.Context, f func(context.Context) error) error {
retryer := onCodes(DefaultRetryBackoff, codes.Aborted)
funcWithRetry := func(ctx context.Context) error {
for {
err := f(ctx)
if err == nil {
return nil
}
delay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
return err
}
trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay)
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
}
}
return funcWithRetry(ctx)
}

// isErrorClosing reports whether the error is generated by gRPC layer talking
// to a closed server.
func isErrorClosing(err error) bool {
Expand Down
21 changes: 21 additions & 0 deletions spanner/retry_test.go
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -40,3 +41,23 @@ func TestRetryInfo(t *testing.T) {
t.Errorf("<ok, retryDelay> = <%t, %v>, want <true, %v>", ok, gotDelay, time.Second)
}
}

func TestRetryerRespectsServerDelay(t *testing.T) {
t.Parallel()
serverDelay := 50 * time.Millisecond
b, _ := proto.Marshal(&edpb.RetryInfo{
RetryDelay: ptypes.DurationProto(serverDelay),
})
trailers := map[string]string{
retryInfoKey: string(b),
}
retryer := onCodes(gax.Backoff{}, codes.Aborted)
err := toSpannerErrorWithMetadata(spannerErrorf(codes.Aborted, "transaction was aborted"), metadata.New(trailers))
maxSeenDelay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
t.Fatalf("expected shouldRetry to be true")
}
if maxSeenDelay != serverDelay {
t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay)
}
}

0 comments on commit 5709d56

Please sign in to comment.