From 478b8dd4e0d722cbf02fa2e216929eb561694fe0 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 16 Sep 2022 15:06:52 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): define append retry predicate (#6650) * feat(bigquery/storage/managedwriter): define append retry predicate This PR models the retry predicate we'll use for evaluating whether appends should be retried automatically. --- bigquery/storage/managedwriter/retry.go | 50 ++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 7ace796c163..5560443a64e 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "strings" "time" "github.com/googleapis/gax-go/v2" @@ -25,28 +26,65 @@ import ( "google.golang.org/grpc/status" ) +var ( + defaultAppendRetries = 3 +) + +func newDefaultRetryer() *defaultRetryer { + return &defaultRetryer{ + bigBo: gax.Backoff{ + Initial: 2 * time.Second, + Multiplier: 5, + Max: 5 * time.Minute, + }, + } +} + type defaultRetryer struct { - bo gax.Backoff + bo gax.Backoff + bigBo gax.Backoff // For more aggressive backoff, such as throughput quota } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // TODO: refine this logic in a subsequent PR, there's some service-specific - // retry predicates in addition to statuscode-based. + // This predicate evaluates errors for both enqueuing and reconnection. + // See RetryAppend for retry that bounds attempts to a fixed number. s, ok := status.FromError(err) if !ok { // Treat context errors as non-retriable. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return r.bo.Pause(), false } + // EOF can happen in the case of connection close. + if errors.Is(err, io.EOF) { + return r.bo.Pause(), true + } // Any other non-status based errors treated as retryable. return r.bo.Pause(), true } switch s.Code() { - case codes.Unavailable: + case codes.Aborted, + codes.Canceled, + codes.DeadlineExceeded, + codes.Internal, + codes.Unavailable: return r.bo.Pause(), true - default: - return r.bo.Pause(), false + case codes.ResourceExhausted: + if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { + // Note: internal b/246031522 opened to give this a structured error + // and avoid string parsing. Should be a QuotaFailure or similar. + return r.bigBo.Pause(), true // more aggressive backoff + } + } + return 0, false +} + +// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. +func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { + + if attemptCount > defaultAppendRetries { + return 0, false // exceeded maximum retries. } + return r.Retry(err) } // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force