Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): define append retry predicate #6650

Merged
merged 9 commits into from Sep 16, 2022
50 changes: 44 additions & 6 deletions bigquery/storage/managedwriter/retry.go
Expand Up @@ -18,35 +18,73 @@ import (
"context"
"errors"
"io"
"strings"
"time"

"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
defaultAppendRetries = 3
)

func newDefaultRetryer() *defaultRetryer {
return &defaultRetryer{
bigBo: gax.Backoff{
Initial: 2 * time.Second,
Multiplier: 5,
Max: 5 * time.Minute,
},
}
}

type defaultRetryer struct {
bo gax.Backoff
bo gax.Backoff
bigBo gax.Backoff // For more aggressive backoff, such as throughput quota
}

func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
// TODO: refine this logic in a subsequent PR, there's some service-specific
// retry predicates in addition to statuscode-based.
// This predicate evaluates errors for both enqueuing and reconnection.
// See RetryAppend for retry that bounds attempts to a fixed number.
s, ok := status.FromError(err)
if !ok {
// Treat context errors as non-retriable.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return r.bo.Pause(), false
}
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
return r.bo.Pause(), true
}
// Any other non-status based errors treated as retryable.
return r.bo.Pause(), true
}
switch s.Code() {
case codes.Unavailable:
case codes.Aborted,
codes.Canceled,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
return r.bo.Pause(), true
default:
return r.bo.Pause(), false
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// and avoid string parsing. Should be a QuotaFailure or similar.
return r.bigBo.Pause(), true // more aggressive backoff
}
shollyman marked this conversation as resolved.
Show resolved Hide resolved
}
return 0, false
}

// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts.
func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) {

if attemptCount > defaultAppendRetries {
return 0, false // exceeded maximum retries.
}
return r.Retry(err)
}

// shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
Expand Down