diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index bbbda9982da..c1a5ccdd149 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -41,6 +41,9 @@ type AppendResult struct { // retains the original response. response *storagepb.AppendRowsResponse + + // retains the number of times this individual write was enqueued. + totalAttempts int } func newAppendResult(data [][]byte) *AppendResult { @@ -146,6 +149,18 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche } } +// TotalAttempts returns the number of times this write was attempted. +// +// This call blocks until the result is ready, or context is no longer valid. +func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) { + select { + case <-ctx.Done(): + return 0, fmt.Errorf("context done") + case <-ar.Ready(): + return ar.totalAttempts, nil + } +} + // pendingWrite tracks state for a set of rows that are part of a single // append request. type pendingWrite struct { @@ -180,9 +195,8 @@ func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite { }, }, }, - result: newAppendResult(appends), - attemptCount: 1, - reqCtx: ctx, + result: newAppendResult(appends), + reqCtx: ctx, } // We compute the size now for flow controller purposes, though // the actual request size may be slightly larger (e.g. the first @@ -198,6 +212,8 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error, pw.result.response = resp } pw.result.err = err + // Record the final attempts in the result for the user. + pw.result.totalAttempts = pw.attemptCount close(pw.result.ready) // Clear the reference to the request. diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index bc94e3182df..4408f1a4094 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,6 +117,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), + // We add the new retryer by default, and add a new option to disable it. + retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 464b99a69ed..3eed1ad4397 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -75,6 +75,11 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) + // AppendRetryCount is a measure of the number of appends that were automatically retried by the library + // after receiving a non-successful response. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless) + // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) @@ -114,6 +119,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrorsView *view.View + // AppendRetryView is a cumulative sum of AppendRetryCount. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryView *view.View + // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View @@ -130,7 +139,7 @@ func init() { AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) - + AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin) FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) DefaultOpenCensusViews = []*view.View{ @@ -144,6 +153,7 @@ func init() { AppendResponsesView, AppendResponseErrorsView, + AppendRetryView, FlushRequestsView, } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 4988280527f..c1efb18b576 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "sync" + "time" "cloud.google.com/go/bigquery/internal" "github.com/googleapis/gax-go/v2" @@ -76,6 +77,7 @@ type ManagedStream struct { destinationTable string c *Client fc *flowController + retry *statelessRetryer // aspects of the stream client ctx context.Context // retained context for the stream @@ -223,7 +225,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := defaultRetryer{} + r := &unaryRetryer{} for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" @@ -250,7 +252,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie } } ch := make(chan *pendingWrite, depth) - go recvProcessor(ms.ctx, arc, ms.fc, ch) + go recvProcessor(ms, arc, ch) // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work // for every new connection. ms.streamSetup = new(sync.Once) @@ -315,6 +317,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { req = reqCopy }) + // Increment the attempt count. + pw.attemptCount = pw.attemptCount + 1 if req != nil { // First append in a new connection needs properties like schema and stream name set. err = (*arc).Send(req) @@ -324,6 +328,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { } if err != nil { if shouldReconnect(err) { + // certain error responses are indicative that this connection is no longer healthy. + // if we encounter them, we force a reconnect so the next append has a healthy connection. ms.reconnect = true } return err @@ -345,16 +351,11 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { // lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked // for expiry to enable faster failures, it is not propagated more deeply. func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { - // Resolve retry settings. var settings gax.CallSettings for _, opt := range opts { opt.Resolve(&settings) } - var r gax.Retryer = &defaultRetryer{} - if settings.Retry != nil { - r = settings.Retry() - } for { appendErr := ms.lockingAppend(pw) @@ -365,7 +366,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) recordStat(ctx, AppendRequestErrors, 1) } - bo, shouldRetry := r.Retry(appendErr) + bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount) if shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { return err @@ -466,44 +467,92 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. // -// The receive processor only deals with a single instance of a connection/channel, and thus should never interact -// with the mutex lock. -func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) { - // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply - // ensure that pending writes get acknowledged with a terminal state. +// The receive processor is only responsible for a single bidi channel/channel. As new connections are established, +// each gets it's own instance of a processor. +// +// The ManagedStream reference is used for performing re-enqueing of failed writes. +func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { for { select { - case <-ctx.Done(): - // Context is done, so we're not going to get further updates. Mark all work failed with the context error. + case <-ms.ctx.Done(): + // Context is done, so we're not going to get further updates. Mark all work left in the channel + // with the context error. We don't attempt to re-enqueue in this case. for { pw, ok := <-ch if !ok { return } - pw.markDone(nil, ctx.Err(), fc) + pw.markDone(nil, ms.ctx.Err(), ms.fc) } case nextWrite, ok := <-ch: if !ok { // Channel closed, all elements processed. return } - // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - nextWrite.markDone(nil, err, fc) + // Evaluate the error from the receive and possibly retry. + ms.processRetry(nextWrite, nil, err) + // We're done with the write regardless of outcome, continue onto the + // next element. continue } - recordStat(ctx, AppendResponses, 1) + // Record that we did in fact get a response from the backend. + recordStat(ms.ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { - tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())) - if err != nil { - tagCtx = ctx + // The response from the backend embedded a status error. We record that the error + // occurred, and tag it based on the response code of the status. + if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { + recordStat(tagCtx, AppendResponseErrors, 1) + } + respErr := grpcstatus.ErrorProto(status) + if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry { + // We use the status error to evaluate and possible re-enqueue the write. + ms.processRetry(nextWrite, resp, respErr) + // We're done with the write regardless of outcome, continue on to the next + // element. + continue } - recordStat(tagCtx, AppendResponseErrors, 1) } - nextWrite.markDone(resp, nil, fc) + // We had no error in the receive or in the response. Mark the write done. + nextWrite.markDone(resp, nil, ms.fc) } } } + +// processRetry is responsible for evaluating and re-enqueing an append. +// If the append is not retried, it is marked complete. +func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { + err := initialErr + for { + pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) + if !shouldRetry { + // Should not attempt to re-append. + pw.markDone(appendResp, err, ms.fc) + return + } + time.Sleep(pause) + err = ms.appendWithRetry(pw) + if err != nil { + // Re-enqueue failed, send it through the loop again. + continue + } + // Break out of the loop, we were successful and the write has been + // re-inserted. + recordStat(ms.ctx, AppendRetryCount, 1) + break + } +} + +// returns the stateless retryer. If one's not set (re-enqueue retries disabled), +// it returns a retryer that only permits single attempts. +func (ms *ManagedStream) statelessRetryer() *statelessRetryer { + if ms.retry != nil { + return ms.retry + } + return &statelessRetryer{ + maxAttempts: 1, + } +} diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index be4564b0a33..fa7fdadae8f 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "io" "runtime" "testing" "time" @@ -25,6 +26,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -92,10 +94,16 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } } +type testRecvResponse struct { + resp *storagepb.AppendRowsResponse + err error +} + type testAppendRowsClient struct { storagepb.BigQueryWrite_AppendRowsClient openCount int requests []*storagepb.AppendRowsRequest + responses []*testRecvResponse sendF func(*storagepb.AppendRowsRequest) error recvF func() (*storagepb.AppendRowsResponse, error) closeF func() error @@ -483,3 +491,199 @@ func TestOpenCallOptionPropagation(t *testing.T) { } ms.openWithRetry() } + +// This test evaluates how the receiver deals with a pending write. +func TestManagedStream_Receiver(t *testing.T) { + + var customErr = fmt.Errorf("foo") + + testCases := []struct { + description string + recvResp []*testRecvResponse + wantFinalErr error + wantTotalAttempts int + }{ + { + description: "no errors", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 1, + }, + { + description: "recv err w/io.EOF", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "recv err retried and then failed", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: nil, + err: customErr, + }, + }, + wantTotalAttempts: 2, + wantFinalErr: customErr, + }, + { + description: "recv err w/ custom error", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: customErr, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 1, + wantFinalErr: customErr, + }, + + { + description: "resp embeds Unavailable", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.Unavailable), + Message: "foo", + }, + }, + }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "resp embeds generic ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "foo", + }, + }, + }, + err: nil, + }, + }, + wantTotalAttempts: 1, + }, + { + description: "resp embeds throughput ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "Exceeds 'AppendRows throughput' quota for stream blah", + }, + }, + }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantTotalAttempts: 2, + }, + { + description: "retriable failures until max attempts", + recvResp: []*testRecvResponse{ + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + }, + wantTotalAttempts: 4, + wantFinalErr: io.EOF, + }, + } + + for _, tc := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + + testArc := &testAppendRowsClient{ + responses: tc.recvResp, + } + + ms := &ManagedStream{ + ctx: ctx, + open: openTestArc(testArc, nil, + func() (*storagepb.AppendRowsResponse, error) { + if len(testArc.responses) == 0 { + panic("out of responses") + } + curResp := testArc.responses[0] + testArc.responses = testArc.responses[1:] + return curResp.resp, curResp.err + }, + ), + streamSettings: defaultStreamSettings(), + fc: newFlowController(0, 0), + retry: newStatelessRetryer(), + } + // use openWithRetry to get the reference to the channel and add our test pending write. + _, ch, _ := ms.openWithRetry() + pw := newPendingWrite(ctx, [][]byte{[]byte("foo")}) + pw.attemptCount = 1 // we're injecting directly, but attribute this as a single attempt. + ch <- pw + + // Wait until the write is marked done. + <-pw.result.Ready() + + // Check retry count is as expected. + gotTotalAttempts, err := pw.result.TotalAttempts(ctx) + if err != nil { + t.Errorf("failed to get total appends: %v", err) + } + if gotTotalAttempts != tc.wantTotalAttempts { + t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts) + } + + // Check that the write got the expected final result. + if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) { + t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr) + } + ms.Close() + cancel() + } +} diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 50164a84638..341c31e2636 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,6 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } +// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. +func DisableWriteRetries(disable bool) WriterOption { + return func(ms *ManagedStream) { + if disable { + ms.retry = nil + } + } +} + // AppendOption are options that can be passed when appending data with a managed stream instance. type AppendOption func(*pendingWrite) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 5560443a64e..adbe12ab9f2 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -15,9 +15,9 @@ package managedwriter import ( - "context" "errors" "io" + "math/rand" "strings" "time" @@ -27,39 +27,26 @@ import ( ) var ( - defaultAppendRetries = 3 + defaultRetryAttempts = 4 ) -func newDefaultRetryer() *defaultRetryer { - return &defaultRetryer{ - bigBo: gax.Backoff{ - Initial: 2 * time.Second, - Multiplier: 5, - Max: 5 * time.Minute, - }, +// This retry predicate is used for higher level retries, enqueing appends onto to a bidi +// channel and evaluating whether an append should be retried (re-enqueued). +func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { + if err == nil { + return } -} - -type defaultRetryer struct { - bo gax.Backoff - bigBo gax.Backoff // For more aggressive backoff, such as throughput quota -} -func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // This predicate evaluates errors for both enqueuing and reconnection. - // See RetryAppend for retry that bounds attempts to a fixed number. s, ok := status.FromError(err) + // non-status based error conditions. if !ok { - // Treat context errors as non-retriable. - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return r.bo.Pause(), false - } // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { - return r.bo.Pause(), true + shouldRetry = true + return } - // Any other non-status based errors treated as retryable. - return r.bo.Pause(), true + // All other non-status errors are treated as non-retryable (including context errors). + return } switch s.Code() { case codes.Aborted, @@ -67,24 +54,70 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool codes.DeadlineExceeded, codes.Internal, codes.Unavailable: - return r.bo.Pause(), true + shouldRetry = true + return case codes.ResourceExhausted: if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. - return r.bigBo.Pause(), true // more aggressive backoff + shouldRetry = true + return } } - return 0, false + return } -// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. -func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { +// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection. +type unaryRetryer struct { + bo gax.Backoff +} + +func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) { + shouldRetry, _ := retryPredicate(err) + return ur.bo.Pause(), shouldRetry +} + +// statelessRetryer is used for backing off within a continuous process, like processing the responses +// from the receive side of the bidi stream. An individual item in that process has a notion of an attempt +// count, and we use maximum retries as a way of evicting bad items. +type statelessRetryer struct { + r *rand.Rand + minBackoff time.Duration + jitter time.Duration + aggressiveFactor int + maxAttempts int +} - if attemptCount > defaultAppendRetries { - return 0, false // exceeded maximum retries. +func newStatelessRetryer() *statelessRetryer { + return &statelessRetryer{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + minBackoff: 50 * time.Millisecond, + jitter: time.Second, + maxAttempts: defaultRetryAttempts, } - return r.Retry(err) +} + +func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { + jitter := sr.jitter.Nanoseconds() + if jitter > 0 { + jitter = sr.r.Int63n(jitter) + } + pause := sr.minBackoff.Nanoseconds() + jitter + if aggressiveBackoff { + pause = pause * int64(sr.aggressiveFactor) + } + return time.Duration(pause) +} + +func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) { + if attemptCount >= sr.maxAttempts { + return 0, false + } + shouldRetry, aggressive := retryPredicate(err) + if shouldRetry { + return sr.pause(aggressive), true + } + return 0, false } // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force diff --git a/bigquery/storage/managedwriter/retry_test.go b/bigquery/storage/managedwriter/retry_test.go index ca4272339c1..67d5f1e05ff 100644 --- a/bigquery/storage/managedwriter/retry_test.go +++ b/bigquery/storage/managedwriter/retry_test.go @@ -24,6 +24,49 @@ import ( "google.golang.org/grpc/status" ) +func TestManagedStream_AppendErrorRetries(t *testing.T) { + + testCases := []struct { + err error + attemptCount int + want bool + }{ + { + err: fmt.Errorf("random error"), + want: false, + }, + { + err: io.EOF, + want: true, + }, + { + err: io.EOF, + attemptCount: 4, + want: false, + }, + { + err: status.Error(codes.Unavailable, "nope"), + want: true, + }, + { + err: status.Error(codes.ResourceExhausted, "out of gas"), + want: false, + }, + { + err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), + want: true, + }, + } + + retry := newStatelessRetryer() + + for _, tc := range testCases { + if _, got := retry.Retry(tc.err, tc.attemptCount); got != tc.want { + t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err) + } + } +} + func TestManagedStream_ShouldReconnect(t *testing.T) { testCases := []struct {