Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): define append retry predicate #6650

Merged
merged 9 commits into from Sep 16, 2022
70 changes: 63 additions & 7 deletions bigquery/storage/managedwriter/retry.go
Expand Up @@ -21,17 +21,26 @@ import (
"time"

"github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
defaultAppendRetries = 3
knownReconnectErrors = []error{
io.EOF,
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport
shollyman marked this conversation as resolved.
Show resolved Hide resolved
}
)

type defaultRetryer struct {
bo gax.Backoff
}

func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
// TODO: refine this logic in a subsequent PR, there's some service-specific
// retry predicates in addition to statuscode-based.
// This predicate evaluates enqueuing.
s, ok := status.FromError(err)
if !ok {
// Treat context errors as non-retriable.
Expand All @@ -49,15 +58,62 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool
}
}

func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return 0, false // This shouldn't need to be here, and is only provided defensively.
}
if attemptCount > defaultAppendRetries {
return 0, false // exceeded maximum retries.
}
// This predicate evaluates the received response to determine if we should re-enqueue.
apiErr, ok := apierror.FromError(err)
if !ok {
// These are non status-based errors.
// Context errors are non-retriable.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return 0, false
}
// The same errors that trigger fast reconnect are retryable, as they deal with transient problems
// with the network stream.
if shouldReconnect(err) {
return r.bo.Pause(), true
}
// Any other non-status based errors are not retried.
return 0, false
}
// Next, evaluate service-specific error details.
se := &storagepb.StorageError{}
if e := apiErr.Details().ExtractProtoMessage(se); e == nil {
if se.GetCode() == storagepb.StorageError_OFFSET_OUT_OF_RANGE {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
return r.bo.Pause(), true
}
// No other service-specific errors should be retried.
return 0, false
}
if quota := apiErr.Details().QuotaFailure; quota != nil {
// TODO: followup with yiru on this, there's some deeper checks on resource exhaustion.
return r.bo.Pause(), true
}
// Finally, evaluate based on the more generic grpc error status:
code := apiErr.GRPCStatus().Code()
switch code {
case codes.Aborted,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
return r.bo.Pause(), true
case codes.ResourceExhausted:
// TODO: is there a special case here that's not quota?
}
// We treat all other failures as non-retriable.
return 0, false
}

// shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
// further appends will succeed.
func shouldReconnect(err error) bool {
var knownErrors = []error{
io.EOF,
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport
}
for _, ke := range knownErrors {
for _, ke := range knownReconnectErrors {
if errors.Is(err, ke) {
return true
}
Expand Down