From 6ae9c670a11d80b34872cb05fda933303b81851d Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 27 Sep 2022 14:48:13 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): support append retries (#6695) This PR adds support for automatic retry of failed appends. Failures are evaluated both from the perspective of any receive errors getting the response, as well as any response that may be embedded in response from the service. Previous behavior was to simply re-attempt failures when issuing the append. This PR also adds a new WriterOption (`DisableWriteRetries(disable bool)`) to control this behavior (default is to have retries enabled). For users sensitive to write duplication, this PR also exposes a new TotalAttempts method on the AppendResult, which will indicate the total number of times this write was attempted. This also tries to clean up retries in general a bit more. The generated client will already retry unary RPCs, subject to the [service config](https://github.com/googleapis/googleapis/blob/master/google/cloud/bigquery/storage/v1/bigquerystorage_grpc_service_config.json) present when generating the storage API. We specifically clarify/introduce two additional kinds of retries above that: a unary retry and a stateless retry. The unary retry is used to (re)open the underlying bidi network connection which appends are sent upon, as we want to be resilient to reconnection. The unary retry is effectively stateful for the operation of reopening the connection, and thus uses a gax-based backoff that backs off with increasing intervals. The stateless retry is used when processing the responses returning from the backend on the bidi stream, where backing off incrementally doesn't make sense. Instead, we use a base backoff and jitter, and for cases where a more severe backoff is warranted (throughput exhaustion) we use a multiplication factor. The intent here is to provide backpressure which will eventually saturate the append queue and cause blocking/rejection of writes until the backlog recovers. --- .../storage/managedwriter/appendresult.go | 22 +- bigquery/storage/managedwriter/client.go | 2 + .../storage/managedwriter/instrumentation.go | 12 +- .../storage/managedwriter/managed_stream.go | 97 ++++++--- .../managedwriter/managed_stream_test.go | 204 ++++++++++++++++++ bigquery/storage/managedwriter/options.go | 9 + bigquery/storage/managedwriter/retry.go | 99 ++++++--- bigquery/storage/managedwriter/retry_test.go | 43 ++++ 8 files changed, 427 insertions(+), 61 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index bbbda9982da..c1a5ccdd149 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -41,6 +41,9 @@ type AppendResult struct { // retains the original response. response *storagepb.AppendRowsResponse + + // retains the number of times this individual write was enqueued. + totalAttempts int } func newAppendResult(data [][]byte) *AppendResult { @@ -146,6 +149,18 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche } } +// TotalAttempts returns the number of times this write was attempted. +// +// This call blocks until the result is ready, or context is no longer valid. +func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) { + select { + case <-ctx.Done(): + return 0, fmt.Errorf("context done") + case <-ar.Ready(): + return ar.totalAttempts, nil + } +} + // pendingWrite tracks state for a set of rows that are part of a single // append request. type pendingWrite struct { @@ -180,9 +195,8 @@ func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite { }, }, }, - result: newAppendResult(appends), - attemptCount: 1, - reqCtx: ctx, + result: newAppendResult(appends), + reqCtx: ctx, } // We compute the size now for flow controller purposes, though // the actual request size may be slightly larger (e.g. the first @@ -198,6 +212,8 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error, pw.result.response = resp } pw.result.err = err + // Record the final attempts in the result for the user. + pw.result.totalAttempts = pw.attemptCount close(pw.result.ready) // Clear the reference to the request. diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index bc94e3182df..4408f1a4094 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,6 +117,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), + // We add the new retryer by default, and add a new option to disable it. + retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 464b99a69ed..3eed1ad4397 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -75,6 +75,11 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) + // AppendRetryCount is a measure of the number of appends that were automatically retried by the library + // after receiving a non-successful response. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless) + // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) @@ -114,6 +119,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrorsView *view.View + // AppendRetryView is a cumulative sum of AppendRetryCount. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryView *view.View + // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View @@ -130,7 +139,7 @@ func init() { AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) - + AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin) FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) DefaultOpenCensusViews = []*view.View{ @@ -144,6 +153,7 @@ func init() { AppendResponsesView, AppendResponseErrorsView, + AppendRetryView, FlushRequestsView, } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 4988280527f..c1efb18b576 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "sync" + "time" "cloud.google.com/go/bigquery/internal" "github.com/googleapis/gax-go/v2" @@ -76,6 +77,7 @@ type ManagedStream struct { destinationTable string c *Client fc *flowController + retry *statelessRetryer // aspects of the stream client ctx context.Context // retained context for the stream @@ -223,7 +225,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := defaultRetryer{} + r := &unaryRetryer{} for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" @@ -250,7 +252,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie } } ch := make(chan *pendingWrite, depth) - go recvProcessor(ms.ctx, arc, ms.fc, ch) + go recvProcessor(ms, arc, ch) // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work // for every new connection. ms.streamSetup = new(sync.Once) @@ -315,6 +317,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { req = reqCopy }) + // Increment the attempt count. + pw.attemptCount = pw.attemptCount + 1 if req != nil { // First append in a new connection needs properties like schema and stream name set. err = (*arc).Send(req) @@ -324,6 +328,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { } if err != nil { if shouldReconnect(err) { + // certain error responses are indicative that this connection is no longer healthy. + // if we encounter them, we force a reconnect so the next append has a healthy connection. ms.reconnect = true } return err @@ -345,16 +351,11 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { // lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked // for expiry to enable faster failures, it is not propagated more deeply. func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { - // Resolve retry settings. var settings gax.CallSettings for _, opt := range opts { opt.Resolve(&settings) } - var r gax.Retryer = &defaultRetryer{} - if settings.Retry != nil { - r = settings.Retry() - } for { appendErr := ms.lockingAppend(pw) @@ -365,7 +366,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) recordStat(ctx, AppendRequestErrors, 1) } - bo, shouldRetry := r.Retry(appendErr) + bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount) if shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { return err @@ -466,44 +467,92 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. // -// The receive processor only deals with a single instance of a connection/channel, and thus should never interact -// with the mutex lock. -func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) { - // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply - // ensure that pending writes get acknowledged with a terminal state. +// The receive processor is only responsible for a single bidi channel/channel. As new connections are established, +// each gets it's own instance of a processor. +// +// The ManagedStream reference is used for performing re-enqueing of failed writes. +func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { for { select { - case <-ctx.Done(): - // Context is done, so we're not going to get further updates. Mark all work failed with the context error. + case <-ms.ctx.Done(): + // Context is done, so we're not going to get further updates. Mark all work left in the channel + // with the context error. We don't attempt to re-enqueue in this case. for { pw, ok := <-ch if !ok { return } - pw.markDone(nil, ctx.Err(), fc) + pw.markDone(nil, ms.ctx.Err(), ms.fc) } case nextWrite, ok := <-ch: if !ok { // Channel closed, all elements processed. return } - // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - nextWrite.markDone(nil, err, fc) + // Evaluate the error from the receive and possibly retry. + ms.processRetry(nextWrite, nil, err) + // We're done with the write regardless of outcome, continue onto the + // next element. continue } - recordStat(ctx, AppendResponses, 1) + // Record that we did in fact get a response from the backend. + recordStat(ms.ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { - tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())) - if err != nil { - tagCtx = ctx + // The response from the backend embedded a status error. We record that the error + // occurred, and tag it based on the response code of the status. + if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { + recordStat(tagCtx, AppendResponseErrors, 1) + } + respErr := grpcstatus.ErrorProto(status) + if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry { + // We use the status error to evaluate and possible re-enqueue the write. + ms.processRetry(nextWrite, resp, respErr) + // We're done with the write regardless of outcome, continue on to the next + // element. + continue } - recordStat(tagCtx, AppendResponseErrors, 1) } - nextWrite.markDone(resp, nil, fc) + // We had no error in the receive or in the response. Mark the write done. + nextWrite.markDone(resp, nil, ms.fc) } } } + +// processRetry is responsible for evaluating and re-enqueing an append. +// If the append is not retried, it is marked complete. +func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { + err := initialErr + for { + pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) + if !shouldRetry { + // Should not attempt to re-append. + pw.markDone(appendResp, err, ms.fc) + return + } + time.Sleep(pause) + err = ms.appendWithRetry(pw) + if err != nil { + // Re-enqueue failed, send it through the loop again. + continue + } + // Break out of the loop, we were successful and the write has been + // re-inserted. + recordStat(ms.ctx, AppendRetryCount, 1) + break + } +} + +// returns the stateless retryer. If one's not set (re-enqueue retries disabled), +// it returns a retryer that only permits single attempts. +func (ms *ManagedStream) statelessRetryer() *statelessRetryer { + if ms.retry != nil { + return ms.retry + } + return &statelessRetryer{ + maxAttempts: 1, + } +} diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index be4564b0a33..fa7fdadae8f 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "io" "runtime" "testing" "time" @@ -25,6 +26,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -92,10 +94,16 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } } +type testRecvResponse struct { + resp *storagepb.AppendRowsResponse + err error +} + type testAppendRowsClient struct { storagepb.BigQueryWrite_AppendRowsClient openCount int requests []*storagepb.AppendRowsRequest + responses []*testRecvResponse sendF func(*storagepb.AppendRowsRequest) error recvF func() (*storagepb.AppendRowsResponse, error) closeF func() error @@ -483,3 +491,199 @@ func TestOpenCallOptionPropagation(t *testing.T) { } ms.openWithRetry() } + +// This test evaluates how the receiver deals with a pending write. +func TestManagedStream_Receiver(t *testing.T) { + + var customErr = fmt.Errorf("foo") + + testCases := []struct { + description string + recvResp []*testRecvResponse + wantFinalErr error + wantTotalAttempts int + }{ + { + description: "no errors", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 1, + }, + { + description: "recv err w/io.EOF", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "recv err retried and then failed", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: nil, + err: customErr, + }, + }, + wantTotalAttempts: 2, + wantFinalErr: customErr, + }, + { + description: "recv err w/ custom error", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: customErr, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 1, + wantFinalErr: customErr, + }, + + { + description: "resp embeds Unavailable", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.Unavailable), + Message: "foo", + }, + }, + }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "resp embeds generic ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "foo", + }, + }, + }, + err: nil, + }, + }, + wantTotalAttempts: 1, + }, + { + description: "resp embeds throughput ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "Exceeds 'AppendRows throughput' quota for stream blah", + }, + }, + }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "retriable failures until max attempts", + recvResp: []*testRecvResponse{ + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + }, + wantTotalAttempts: 4, + wantFinalErr: io.EOF, + }, + } + + for _, tc := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + + testArc := &testAppendRowsClient{ + responses: tc.recvResp, + } + + ms := &ManagedStream{ + ctx: ctx, + open: openTestArc(testArc, nil, + func() (*storagepb.AppendRowsResponse, error) { + if len(testArc.responses) == 0 { + panic("out of responses") + } + curResp := testArc.responses[0] + testArc.responses = testArc.responses[1:] + return curResp.resp, curResp.err + }, + ), + streamSettings: defaultStreamSettings(), + fc: newFlowController(0, 0), + retry: newStatelessRetryer(), + } + // use openWithRetry to get the reference to the channel and add our test pending write. + _, ch, _ := ms.openWithRetry() + pw := newPendingWrite(ctx, [][]byte{[]byte("foo")}) + pw.attemptCount = 1 // we're injecting directly, but attribute this as a single attempt. + ch <- pw + + // Wait until the write is marked done. + <-pw.result.Ready() + + // Check retry count is as expected. + gotTotalAttempts, err := pw.result.TotalAttempts(ctx) + if err != nil { + t.Errorf("failed to get total appends: %v", err) + } + if gotTotalAttempts != tc.wantTotalAttempts { + t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts) + } + + // Check that the write got the expected final result. + if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) { + t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr) + } + ms.Close() + cancel() + } +} diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 50164a84638..341c31e2636 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,6 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } +// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. +func DisableWriteRetries(disable bool) WriterOption { + return func(ms *ManagedStream) { + if disable { + ms.retry = nil + } + } +} + // AppendOption are options that can be passed when appending data with a managed stream instance. type AppendOption func(*pendingWrite) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 5560443a64e..adbe12ab9f2 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -15,9 +15,9 @@ package managedwriter import ( - "context" "errors" "io" + "math/rand" "strings" "time" @@ -27,39 +27,26 @@ import ( ) var ( - defaultAppendRetries = 3 + defaultRetryAttempts = 4 ) -func newDefaultRetryer() *defaultRetryer { - return &defaultRetryer{ - bigBo: gax.Backoff{ - Initial: 2 * time.Second, - Multiplier: 5, - Max: 5 * time.Minute, - }, +// This retry predicate is used for higher level retries, enqueing appends onto to a bidi +// channel and evaluating whether an append should be retried (re-enqueued). +func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { + if err == nil { + return } -} - -type defaultRetryer struct { - bo gax.Backoff - bigBo gax.Backoff // For more aggressive backoff, such as throughput quota -} -func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // This predicate evaluates errors for both enqueuing and reconnection. - // See RetryAppend for retry that bounds attempts to a fixed number. s, ok := status.FromError(err) + // non-status based error conditions. if !ok { - // Treat context errors as non-retriable. - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return r.bo.Pause(), false - } // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { - return r.bo.Pause(), true + shouldRetry = true + return } - // Any other non-status based errors treated as retryable. - return r.bo.Pause(), true + // All other non-status errors are treated as non-retryable (including context errors). + return } switch s.Code() { case codes.Aborted, @@ -67,24 +54,70 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool codes.DeadlineExceeded, codes.Internal, codes.Unavailable: - return r.bo.Pause(), true + shouldRetry = true + return case codes.ResourceExhausted: if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. - return r.bigBo.Pause(), true // more aggressive backoff + shouldRetry = true + return } } - return 0, false + return } -// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. -func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { +// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection. +type unaryRetryer struct { + bo gax.Backoff +} + +func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) { + shouldRetry, _ := retryPredicate(err) + return ur.bo.Pause(), shouldRetry +} + +// statelessRetryer is used for backing off within a continuous process, like processing the responses +// from the receive side of the bidi stream. An individual item in that process has a notion of an attempt +// count, and we use maximum retries as a way of evicting bad items. +type statelessRetryer struct { + r *rand.Rand + minBackoff time.Duration + jitter time.Duration + aggressiveFactor int + maxAttempts int +} - if attemptCount > defaultAppendRetries { - return 0, false // exceeded maximum retries. +func newStatelessRetryer() *statelessRetryer { + return &statelessRetryer{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + minBackoff: 50 * time.Millisecond, + jitter: time.Second, + maxAttempts: defaultRetryAttempts, } - return r.Retry(err) +} + +func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { + jitter := sr.jitter.Nanoseconds() + if jitter > 0 { + jitter = sr.r.Int63n(jitter) + } + pause := sr.minBackoff.Nanoseconds() + jitter + if aggressiveBackoff { + pause = pause * int64(sr.aggressiveFactor) + } + return time.Duration(pause) +} + +func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) { + if attemptCount >= sr.maxAttempts { + return 0, false + } + shouldRetry, aggressive := retryPredicate(err) + if shouldRetry { + return sr.pause(aggressive), true + } + return 0, false } // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force diff --git a/bigquery/storage/managedwriter/retry_test.go b/bigquery/storage/managedwriter/retry_test.go index ca4272339c1..67d5f1e05ff 100644 --- a/bigquery/storage/managedwriter/retry_test.go +++ b/bigquery/storage/managedwriter/retry_test.go @@ -24,6 +24,49 @@ import ( "google.golang.org/grpc/status" ) +func TestManagedStream_AppendErrorRetries(t *testing.T) { + + testCases := []struct { + err error + attemptCount int + want bool + }{ + { + err: fmt.Errorf("random error"), + want: false, + }, + { + err: io.EOF, + want: true, + }, + { + err: io.EOF, + attemptCount: 4, + want: false, + }, + { + err: status.Error(codes.Unavailable, "nope"), + want: true, + }, + { + err: status.Error(codes.ResourceExhausted, "out of gas"), + want: false, + }, + { + err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), + want: true, + }, + } + + retry := newStatelessRetryer() + + for _, tc := range testCases { + if _, got := retry.Retry(tc.err, tc.attemptCount); got != tc.want { + t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err) + } + } +} + func TestManagedStream_ShouldReconnect(t *testing.T) { testCases := []struct {