From 82099ff4a78d75266baa2ecf87e90cde4be262d2 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 19 Sep 2022 19:14:07 +0000 Subject: [PATCH 01/12] feat(bigquery/storage/managedwriter): support append retries --- .../storage/managedwriter/managed_stream.go | 21 +++++++++---------- bigquery/storage/managedwriter/options.go | 8 +++++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 4988280527f..f4ab9310dd9 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -76,6 +76,7 @@ type ManagedStream struct { destinationTable string c *Client fc *flowController + retry *defaultRetryer // aspects of the stream client ctx context.Context // retained context for the stream @@ -250,7 +251,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie } } ch := make(chan *pendingWrite, depth) - go recvProcessor(ms.ctx, arc, ms.fc, ch) + go recvProcessor(ms, arc, ch) // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work // for every new connection. ms.streamSetup = new(sync.Once) @@ -468,19 +469,19 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // // The receive processor only deals with a single instance of a connection/channel, and thus should never interact // with the mutex lock. -func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) { +func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply // ensure that pending writes get acknowledged with a terminal state. for { select { - case <-ctx.Done(): + case <-ms.ctx.Done(): // Context is done, so we're not going to get further updates. Mark all work failed with the context error. for { pw, ok := <-ch if !ok { return } - pw.markDone(nil, ctx.Err(), fc) + pw.markDone(nil, ms.ctx.Err(), ms.fc) } case nextWrite, ok := <-ch: if !ok { @@ -491,19 +492,17 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - nextWrite.markDone(nil, err, fc) + nextWrite.markDone(nil, err, ms.fc) continue } - recordStat(ctx, AppendResponses, 1) + recordStat(ms.ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { - tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())) - if err != nil { - tagCtx = ctx + if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { + recordStat(tagCtx, AppendResponseErrors, 1) } - recordStat(tagCtx, AppendResponseErrors, 1) } - nextWrite.markDone(resp, nil, fc) + nextWrite.markDone(resp, nil, ms.fc) } } } diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 50164a84638..9239bd8fdf6 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,6 +98,14 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } +// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when +// it opens the underlying append stream. +func WithExperimentalRetry(o gax.CallOption) WriterOption { + return func(ms *ManagedStream) { + ms.retry = newDefaultRetryer() + } +} + // AppendOption are options that can be passed when appending data with a managed stream instance. type AppendOption func(*pendingWrite) From 01676958b2d8780fc52a0811ce9380eb3cf8be0f Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 20 Sep 2022 02:22:04 +0000 Subject: [PATCH 02/12] add instrumentation metrics/views --- bigquery/storage/managedwriter/instrumentation.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 464b99a69ed..3eed1ad4397 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -75,6 +75,11 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless) + // AppendRetryCount is a measure of the number of appends that were automatically retried by the library + // after receiving a non-successful response. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless) + // FlushRequests is a measure of the number of FlushRows requests sent. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless) @@ -114,6 +119,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponseErrorsView *view.View + // AppendRetryView is a cumulative sum of AppendRetryCount. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRetryView *view.View + // FlushRequestsView is a cumulative sum of FlushRequests. // It is EXPERIMENTAL and subject to change or removal without notice. FlushRequestsView *view.View @@ -130,7 +139,7 @@ func init() { AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) - + AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin) FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin) DefaultOpenCensusViews = []*view.View{ @@ -144,6 +153,7 @@ func init() { AppendResponsesView, AppendResponseErrorsView, + AppendRetryView, FlushRequestsView, } From f04a2ca8050fc441d31a61d4f6028160c6b69106 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 20 Sep 2022 16:32:41 +0000 Subject: [PATCH 03/12] stash broken stuff --- .../storage/managedwriter/managed_stream.go | 62 +++++++++++++++- .../managedwriter/managed_stream_test.go | 72 +++++++++++++++++++ bigquery/storage/managedwriter/retry.go | 4 +- bigquery/storage/managedwriter/retry_test.go | 43 +++++++++++ 4 files changed, 177 insertions(+), 4 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index f4ab9310dd9..7b705dc8c79 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "io" + "log" "sync" + "time" "cloud.google.com/go/bigquery/internal" "github.com/googleapis/gax-go/v2" @@ -488,21 +490,77 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie // Channel closed, all elements processed. return } - // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - nextWrite.markDone(nil, err, ms.fc) + // Evaluate and possibly retry based on the receive error. + ms.processRetry(nextWrite, err) + // We're done with the write regardless of outcome. continue } + // We received a response (it may contain an error). recordStat(ms.ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { + // Mark that we received an error in instrumentation. if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { recordStat(tagCtx, AppendResponseErrors, 1) } + respErr := grpcstatus.ErrorProto(status) + if ms.shouldRetryAppend(respErr, nextWrite.attemptCount) { + // The response had an error attached and it should be retried. + log.Printf("checking based on resp status") + ms.processRetry(nextWrite, respErr) + // We're done with the write regardless of outcome. + continue + } } + // We either had no error in the response, or should not have retried. + // mark the write done. nextWrite.markDone(resp, nil, ms.fc) } } } + +// processRetry is responsible for evaluating and re-enqueing an append. +// If the append is not retried, it is marked complete. +func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { + if ms.retry == nil { + log.Printf("no retries") + // No retries. Mark the write done and return. + pw.markDone(nil, initialErr, ms.fc) + return + } + err := initialErr + for { + pause, shouldRetry := ms.retry.RetryAppend(err, pw.attemptCount) + if !shouldRetry { + // Should not attempt to re-append. + log.Printf("didn't pass shouldRetry") + pw.markDone(nil, err, ms.fc) + return + } + // we use the pause the slow the receiver loop as a whole. + time.Sleep(pause) + pw.attemptCount = pw.attemptCount + 1 + log.Printf("appending") + err := ms.appendWithRetry(pw) + if err != nil { + log.Printf("append errored: %v", err) + // Got a failure, send it through the loop again. + continue + } + // Break out of the loop, we were successful and the write has been + // re-inserted. + break + } + +} + +func (ms *ManagedStream) shouldRetryAppend(err error, attemptCount int) bool { + if ms.retry == nil { + return false + } + _, shouldRetry := ms.retry.RetryAppend(err, attemptCount) + return shouldRetry +} diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index be4564b0a33..038c225d2d4 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -18,6 +18,8 @@ import ( "context" "errors" "fmt" + "io" + "log" "runtime" "testing" "time" @@ -25,6 +27,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -116,6 +119,7 @@ func (tarc *testAppendRowsClient) CloseSend() error { // openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function. func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { sF := func(req *storagepb.AppendRowsRequest) error { + log.Printf("test SF appending %v", req) testARC.requests = append(testARC.requests, req) return nil } @@ -483,3 +487,71 @@ func TestOpenCallOptionPropagation(t *testing.T) { } ms.openWithRetry() } + +func TestManagedStream_Receiver(t *testing.T) { + + testCases := []struct { + desc string + recvResp *storagepb.AppendRowsResponse + recvErr error + + wantAppend bool + }{ + { + desc: "no errors", + recvResp: &storagepb.AppendRowsResponse{}, + wantAppend: false, + }, + { + desc: "recv EOF", + recvResp: &storagepb.AppendRowsResponse{}, + recvErr: io.EOF, + wantAppend: true, + }, + { + desc: "resp unavailable", + recvResp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.Unavailable), + Message: "foo", + }, + }, + }, + wantAppend: true, + }, + } + + for _, tc := range testCases { + t.Logf("TESTCASE %s", tc.desc) + ctx, cancel := context.WithCancel(context.Background()) + + testArc := &testAppendRowsClient{} + + ms := &ManagedStream{ + ctx: ctx, + open: openTestArc(testArc, nil, + func() (*storagepb.AppendRowsResponse, error) { + if len(testArc.requests) == 0 { + t.Logf("emitting testcase response") + return tc.recvResp, tc.recvErr + } + t.Logf("emitting default response") + return &storagepb.AppendRowsResponse{}, nil + }, + ), + streamSettings: defaultStreamSettings(), + fc: newFlowController(0, 0), + retry: newDefaultRetryer(), + } + _, ch, _ := ms.openWithRetry() + ch <- newPendingWrite(ctx, [][]byte{[]byte("foo")}) + time.Sleep(time.Second) + + if gotAppend := len(testArc.requests) > 0; gotAppend != tc.wantAppend { + t.Errorf("%s: got %t, want %t", tc.desc, gotAppend, tc.wantAppend) + } + ms.Close() + cancel() + } +} diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 5560443a64e..ae40220df7f 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -58,8 +58,8 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool if errors.Is(err, io.EOF) { return r.bo.Pause(), true } - // Any other non-status based errors treated as retryable. - return r.bo.Pause(), true + // Any other non-status based errors are not retried. + return 0, false } switch s.Code() { case codes.Aborted, diff --git a/bigquery/storage/managedwriter/retry_test.go b/bigquery/storage/managedwriter/retry_test.go index ca4272339c1..7ca4a188b7d 100644 --- a/bigquery/storage/managedwriter/retry_test.go +++ b/bigquery/storage/managedwriter/retry_test.go @@ -62,3 +62,46 @@ func TestManagedStream_ShouldReconnect(t *testing.T) { } } } + +func TestManagedStream_AppendErrorRetries(t *testing.T) { + + testCases := []struct { + err error + attemptCount int + want bool + }{ + { + err: fmt.Errorf("random error"), + want: false, + }, + { + err: io.EOF, + want: true, + }, + { + err: io.EOF, + attemptCount: 4, + want: false, + }, + { + err: status.Error(codes.Unavailable, "nope"), + want: true, + }, + { + err: status.Error(codes.ResourceExhausted, "out of gas"), + want: false, + }, + { + err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), + want: true, + }, + } + + retry := newDefaultRetryer() + + for _, tc := range testCases { + if _, got := retry.RetryAppend(tc.err, tc.attemptCount); got != tc.want { + t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err) + } + } +} From 66d124ca5a0c710861ce423e57202ba2e9117619 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 17:12:50 +0000 Subject: [PATCH 04/12] cleanup logging madness, refactors --- .../storage/managedwriter/managed_stream.go | 8 +- .../managedwriter/managed_stream_test.go | 131 +++++++++++++----- bigquery/storage/managedwriter/retry.go | 15 ++ 3 files changed, 114 insertions(+), 40 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 7b705dc8c79..622d8b08d18 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "log" "sync" "time" @@ -509,7 +508,6 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie respErr := grpcstatus.ErrorProto(status) if ms.shouldRetryAppend(respErr, nextWrite.attemptCount) { // The response had an error attached and it should be retried. - log.Printf("checking based on resp status") ms.processRetry(nextWrite, respErr) // We're done with the write regardless of outcome. continue @@ -526,7 +524,6 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie // If the append is not retried, it is marked complete. func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { if ms.retry == nil { - log.Printf("no retries") // No retries. Mark the write done and return. pw.markDone(nil, initialErr, ms.fc) return @@ -536,17 +533,14 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { pause, shouldRetry := ms.retry.RetryAppend(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. - log.Printf("didn't pass shouldRetry") pw.markDone(nil, err, ms.fc) return } // we use the pause the slow the receiver loop as a whole. time.Sleep(pause) pw.attemptCount = pw.attemptCount + 1 - log.Printf("appending") - err := ms.appendWithRetry(pw) + err = ms.appendWithRetry(pw) if err != nil { - log.Printf("append errored: %v", err) // Got a failure, send it through the loop again. continue } diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 038c225d2d4..ca51f6eb67c 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "io" - "log" "runtime" "testing" "time" @@ -95,10 +94,16 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } } +type testRecvResponse struct { + resp *storagepb.AppendRowsResponse + err error +} + type testAppendRowsClient struct { storagepb.BigQueryWrite_AppendRowsClient openCount int requests []*storagepb.AppendRowsRequest + responses []*testRecvResponse sendF func(*storagepb.AppendRowsRequest) error recvF func() (*storagepb.AppendRowsResponse, error) closeF func() error @@ -119,7 +124,6 @@ func (tarc *testAppendRowsClient) CloseSend() error { // openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function. func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { sF := func(req *storagepb.AppendRowsRequest) error { - log.Printf("test SF appending %v", req) testARC.requests = append(testARC.requests, req) return nil } @@ -491,65 +495,126 @@ func TestOpenCallOptionPropagation(t *testing.T) { func TestManagedStream_Receiver(t *testing.T) { testCases := []struct { - desc string - recvResp *storagepb.AppendRowsResponse - recvErr error - - wantAppend bool + desc string + recvResp []*testRecvResponse + wantRequestCount int }{ { - desc: "no errors", - recvResp: &storagepb.AppendRowsResponse{}, - wantAppend: false, + desc: "no errors", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantRequestCount: 0, + }, + { + desc: "recv err w/io.EOF", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantRequestCount: 1, }, { - desc: "recv EOF", - recvResp: &storagepb.AppendRowsResponse{}, - recvErr: io.EOF, - wantAppend: true, + desc: "resp embeds Unavailable", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.Unavailable), + Message: "foo", + }, + }, + }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantRequestCount: 1, + }, + { + desc: "resp embeds generic ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "foo", + }, + }, + }, + err: nil, + }, + }, + wantRequestCount: 0, }, { - desc: "resp unavailable", - recvResp: &storagepb.AppendRowsResponse{ - Response: &storagepb.AppendRowsResponse_Error{ - Error: &statuspb.Status{ - Code: int32(codes.Unavailable), - Message: "foo", + desc: "resp embeds throughput ResourceExhausted", + recvResp: []*testRecvResponse{ + { + resp: &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_Error{ + Error: &statuspb.Status{ + Code: int32(codes.ResourceExhausted), + Message: "Exceeds 'AppendRows throughput' quota for stream blah", + }, + }, }, + err: nil, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, }, }, - wantAppend: true, + wantRequestCount: 1, }, } for _, tc := range testCases { - t.Logf("TESTCASE %s", tc.desc) ctx, cancel := context.WithCancel(context.Background()) - testArc := &testAppendRowsClient{} + testArc := &testAppendRowsClient{ + responses: tc.recvResp, + } ms := &ManagedStream{ ctx: ctx, open: openTestArc(testArc, nil, func() (*storagepb.AppendRowsResponse, error) { - if len(testArc.requests) == 0 { - t.Logf("emitting testcase response") - return tc.recvResp, tc.recvErr + if len(testArc.responses) == 0 { + panic("out of responses") } - t.Logf("emitting default response") - return &storagepb.AppendRowsResponse{}, nil + curResp := testArc.responses[0] + testArc.responses = testArc.responses[1:] + return curResp.resp, curResp.err }, ), streamSettings: defaultStreamSettings(), fc: newFlowController(0, 0), - retry: newDefaultRetryer(), + retry: newTestRetryer(), } _, ch, _ := ms.openWithRetry() ch <- newPendingWrite(ctx, [][]byte{[]byte("foo")}) - time.Sleep(time.Second) - - if gotAppend := len(testArc.requests) > 0; gotAppend != tc.wantAppend { - t.Errorf("%s: got %t, want %t", tc.desc, gotAppend, tc.wantAppend) + // We sleep to allow the retries to be processed. I'm not enamored of this, but there's not + // currently a way to check on the status of the goroutine. + time.Sleep(500 * time.Millisecond) + // We should be done. + if gotRequestCount := len(testArc.requests); gotRequestCount != tc.wantRequestCount { + t.Errorf("%s: got %d appends, want %d appends", tc.desc, gotRequestCount, tc.wantRequestCount) } ms.Close() cancel() diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index ae40220df7f..3f6af49a473 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -40,6 +40,21 @@ func newDefaultRetryer() *defaultRetryer { } } +// a retryer that doesn't back off realistically, useful for testing without a +// bunch of extra wait time. +func newTestRetryer() *defaultRetryer { + return &defaultRetryer{ + bo: gax.Backoff{ + Initial: time.Millisecond, + Max: time.Millisecond, + }, + bigBo: gax.Backoff{ + Initial: time.Millisecond, + Max: time.Millisecond, + }, + } +} + type defaultRetryer struct { bo gax.Backoff bigBo gax.Backoff // For more aggressive backoff, such as throughput quota From d7656b44e097ac886c5aeab25889166f3ed55d9e Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 18:37:14 +0000 Subject: [PATCH 05/12] increment AppendRetryCount on re-enqueued append --- bigquery/storage/managedwriter/managed_stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 622d8b08d18..acac3a469e6 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -546,6 +546,7 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { } // Break out of the loop, we were successful and the write has been // re-inserted. + recordStat(ms.ctx, AppendRetryCount, 1) break } From 55befe1a70f649232a735ed2a7342843a9488e13 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 18:54:06 +0000 Subject: [PATCH 06/12] docs pass, clean up mark semantics between receive and response error --- .../storage/managedwriter/managed_stream.go | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index acac3a469e6..7491358537f 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -468,15 +468,16 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. // -// The receive processor only deals with a single instance of a connection/channel, and thus should never interact -// with the mutex lock. +// The receive processor is only responsible for a single bidi channel/channel. As new connections are established, +// each gets it's own instance of a processor. +// +// The ManagedStream reference is used for performing re-enqueing of failed writes. func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { - // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply - // ensure that pending writes get acknowledged with a terminal state. for { select { case <-ms.ctx.Done(): - // Context is done, so we're not going to get further updates. Mark all work failed with the context error. + // Context is done, so we're not going to get further updates. Mark all work left in the channel + // with the context error. We don't attempt to re-enqueue in this case. for { pw, ok := <-ch if !ok { @@ -492,29 +493,31 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - // Evaluate and possibly retry based on the receive error. - ms.processRetry(nextWrite, err) - // We're done with the write regardless of outcome. + // Evaluate the error from the receive and possibly retry. + ms.processRetry(nextWrite, nil, err) + // We're done with the write regardless of outcome, continue onto the + // next element. continue } - // We received a response (it may contain an error). + // Record that we did in fact get a response from the backend. recordStat(ms.ctx, AppendResponses, 1) if status := resp.GetError(); status != nil { - // Mark that we received an error in instrumentation. + // The response from the backend embedded a status error. We record that the error + // occurred, and tag it based on the response code of the status. if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { recordStat(tagCtx, AppendResponseErrors, 1) } respErr := grpcstatus.ErrorProto(status) if ms.shouldRetryAppend(respErr, nextWrite.attemptCount) { - // The response had an error attached and it should be retried. - ms.processRetry(nextWrite, respErr) - // We're done with the write regardless of outcome. + // We use the status error to evaluate and possible re-enqueue the write. + ms.processRetry(nextWrite, resp, respErr) + // We're done with the write regardless of outcome, continue on to the next + // element. continue } } - // We either had no error in the response, or should not have retried. - // mark the write done. + // We had no error in the receive or in the response. Mark the write done. nextWrite.markDone(resp, nil, ms.fc) } } @@ -522,10 +525,10 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie // processRetry is responsible for evaluating and re-enqueing an append. // If the append is not retried, it is marked complete. -func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { +func (ms *ManagedStream) processRetry(pw *pendingWrite, initialResp *storagepb.AppendRowsResponse, initialErr error) { if ms.retry == nil { // No retries. Mark the write done and return. - pw.markDone(nil, initialErr, ms.fc) + pw.markDone(initialResp, initialErr, ms.fc) return } err := initialErr @@ -533,15 +536,14 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { pause, shouldRetry := ms.retry.RetryAppend(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. - pw.markDone(nil, err, ms.fc) + pw.markDone(initialResp, err, ms.fc) return } - // we use the pause the slow the receiver loop as a whole. time.Sleep(pause) pw.attemptCount = pw.attemptCount + 1 err = ms.appendWithRetry(pw) if err != nil { - // Got a failure, send it through the loop again. + // Re-enqueue failed, send it through the loop again. continue } // Break out of the loop, we were successful and the write has been @@ -549,7 +551,6 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialErr error) { recordStat(ms.ctx, AppendRetryCount, 1) break } - } func (ms *ManagedStream) shouldRetryAppend(err error, attemptCount int) bool { From d945e4fd35321fc7c5418b6c22c406ec009809cb Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 19:16:19 +0000 Subject: [PATCH 07/12] improve the retry test --- .../managedwriter/managed_stream_test.go | 86 ++++++++++++++++--- 1 file changed, 74 insertions(+), 12 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index ca51f6eb67c..06f698e7614 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -492,15 +492,19 @@ func TestOpenCallOptionPropagation(t *testing.T) { ms.openWithRetry() } +// This test evaluates how the receiver deals with a pending write. func TestManagedStream_Receiver(t *testing.T) { + var customErr = fmt.Errorf("foo") + testCases := []struct { - desc string + description string recvResp []*testRecvResponse + wantFinalErr error wantRequestCount int }{ { - desc: "no errors", + description: "no errors", recvResp: []*testRecvResponse{ { resp: &storagepb.AppendRowsResponse{}, @@ -510,7 +514,7 @@ func TestManagedStream_Receiver(t *testing.T) { wantRequestCount: 0, }, { - desc: "recv err w/io.EOF", + description: "recv err w/io.EOF", recvResp: []*testRecvResponse{ { resp: nil, @@ -524,7 +528,38 @@ func TestManagedStream_Receiver(t *testing.T) { wantRequestCount: 1, }, { - desc: "resp embeds Unavailable", + description: "recv err retried and then failed", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: io.EOF, + }, + { + resp: nil, + err: customErr, + }, + }, + wantRequestCount: 1, + wantFinalErr: customErr, + }, + { + description: "recv err w/ custom error", + recvResp: []*testRecvResponse{ + { + resp: nil, + err: customErr, + }, + { + resp: &storagepb.AppendRowsResponse{}, + err: nil, + }, + }, + wantRequestCount: 0, + wantFinalErr: customErr, + }, + + { + description: "resp embeds Unavailable", recvResp: []*testRecvResponse{ { resp: &storagepb.AppendRowsResponse{ @@ -545,7 +580,7 @@ func TestManagedStream_Receiver(t *testing.T) { wantRequestCount: 1, }, { - desc: "resp embeds generic ResourceExhausted", + description: "resp embeds generic ResourceExhausted", recvResp: []*testRecvResponse{ { resp: &storagepb.AppendRowsResponse{ @@ -562,7 +597,7 @@ func TestManagedStream_Receiver(t *testing.T) { wantRequestCount: 0, }, { - desc: "resp embeds throughput ResourceExhausted", + description: "resp embeds throughput ResourceExhausted", recvResp: []*testRecvResponse{ { resp: &storagepb.AppendRowsResponse{ @@ -582,6 +617,25 @@ func TestManagedStream_Receiver(t *testing.T) { }, wantRequestCount: 1, }, + { + description: "retriable failures until max attempts", + recvResp: []*testRecvResponse{ + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + { + err: io.EOF, + }, + }, + wantRequestCount: 3, + wantFinalErr: io.EOF, + }, } for _, tc := range testCases { @@ -607,14 +661,22 @@ func TestManagedStream_Receiver(t *testing.T) { fc: newFlowController(0, 0), retry: newTestRetryer(), } + // use openWithRetry to get the reference to the channel and add our test pending write. _, ch, _ := ms.openWithRetry() - ch <- newPendingWrite(ctx, [][]byte{[]byte("foo")}) - // We sleep to allow the retries to be processed. I'm not enamored of this, but there's not - // currently a way to check on the status of the goroutine. - time.Sleep(500 * time.Millisecond) - // We should be done. + pw := newPendingWrite(ctx, [][]byte{[]byte("foo")}) + ch <- pw + + // Wait until the write is marked done. + <-pw.result.Ready() + + // Check retry count is as expected. if gotRequestCount := len(testArc.requests); gotRequestCount != tc.wantRequestCount { - t.Errorf("%s: got %d appends, want %d appends", tc.desc, gotRequestCount, tc.wantRequestCount) + t.Errorf("%s: got %d retries, want %d retries", tc.description, gotRequestCount, tc.wantRequestCount) + } + + // Check that the write got the expected final result. + if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) { + t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr) } ms.Close() cancel() From 3bd7b6f99c55252103b1e6e778297377ec9ce6aa Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 19:31:52 +0000 Subject: [PATCH 08/12] propagate write attempts to the user surface --- bigquery/storage/managedwriter/appendresult.go | 17 +++++++++++++++++ bigquery/storage/managedwriter/client.go | 2 ++ bigquery/storage/managedwriter/options.go | 9 +++++---- bigquery/storage/managedwriter/retry.go | 2 ++ 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index bbbda9982da..30bcba65ff7 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -41,6 +41,9 @@ type AppendResult struct { // retains the original response. response *storagepb.AppendRowsResponse + + // retains the number of times this individual write was enqueued. + totalAttempts int } func newAppendResult(data [][]byte) *AppendResult { @@ -146,6 +149,18 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche } } +// TotalAttempts returns the number of times this write was attempted. +// +// This call blocks until the result is ready, or context is no longer valid. +func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) { + select { + case <-ctx.Done(): + return 0, fmt.Errorf("context done") + case <-ar.Ready(): + return ar.totalAttempts, nil + } +} + // pendingWrite tracks state for a set of rows that are part of a single // append request. type pendingWrite struct { @@ -198,6 +213,8 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error, pw.result.response = resp } pw.result.err = err + // Record the final attempts in the result for the user. + pw.result.totalAttempts = pw.attemptCount close(pw.result.ready) // Clear the reference to the request. diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index bc94e3182df..c1a4b04897f 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -117,6 +117,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, open: createOpenF(ctx, streamFunc), + // We add the new retryer by default, and add a new option to disable it. + retry: newDefaultRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 9239bd8fdf6..341c31e2636 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -98,11 +98,12 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption { } } -// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when -// it opens the underlying append stream. -func WithExperimentalRetry(o gax.CallOption) WriterOption { +// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes. +func DisableWriteRetries(disable bool) WriterOption { return func(ms *ManagedStream) { - ms.retry = newDefaultRetryer() + if disable { + ms.retry = nil + } } } diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 3f6af49a473..b2c2bb22d83 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -55,6 +55,8 @@ func newTestRetryer() *defaultRetryer { } } +// TODO: define a more correct backoff heuristic for stream-based retries. The default gax +// is insufficient for this case. type defaultRetryer struct { bo gax.Backoff bigBo gax.Backoff // For more aggressive backoff, such as throughput quota From f162fe992b9289fabec7333994e915a50c44e548 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 20:23:35 +0000 Subject: [PATCH 09/12] make total attempts less ambiguous --- .../storage/managedwriter/appendresult.go | 5 +-- .../storage/managedwriter/managed_stream.go | 4 +- .../managedwriter/managed_stream_test.go | 39 +++++++++++-------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 30bcba65ff7..c1a5ccdd149 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -195,9 +195,8 @@ func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite { }, }, }, - result: newAppendResult(appends), - attemptCount: 1, - reqCtx: ctx, + result: newAppendResult(appends), + reqCtx: ctx, } // We compute the size now for flow controller purposes, though // the actual request size may be slightly larger (e.g. the first diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 7491358537f..ed904cb25f1 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -317,6 +317,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { req = reqCopy }) + // Increment the attempt count. + pw.attemptCount = pw.attemptCount + 1 if req != nil { // First append in a new connection needs properties like schema and stream name set. err = (*arc).Send(req) @@ -347,7 +349,6 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { // lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked // for expiry to enable faster failures, it is not propagated more deeply. func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { - // Resolve retry settings. var settings gax.CallSettings for _, opt := range opts { @@ -540,7 +541,6 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialResp *storagepb.A return } time.Sleep(pause) - pw.attemptCount = pw.attemptCount + 1 err = ms.appendWithRetry(pw) if err != nil { // Re-enqueue failed, send it through the loop again. diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 06f698e7614..d69b8ffcda8 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -498,10 +498,10 @@ func TestManagedStream_Receiver(t *testing.T) { var customErr = fmt.Errorf("foo") testCases := []struct { - description string - recvResp []*testRecvResponse - wantFinalErr error - wantRequestCount int + description string + recvResp []*testRecvResponse + wantFinalErr error + wantTotalAttempts int }{ { description: "no errors", @@ -511,7 +511,7 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 0, + wantTotalAttempts: 1, }, { description: "recv err w/io.EOF", @@ -525,7 +525,7 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 1, + wantTotalAttempts: 2, }, { description: "recv err retried and then failed", @@ -539,8 +539,8 @@ func TestManagedStream_Receiver(t *testing.T) { err: customErr, }, }, - wantRequestCount: 1, - wantFinalErr: customErr, + wantTotalAttempts: 2, + wantFinalErr: customErr, }, { description: "recv err w/ custom error", @@ -554,8 +554,8 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 0, - wantFinalErr: customErr, + wantTotalAttempts: 1, + wantFinalErr: customErr, }, { @@ -577,7 +577,7 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 1, + wantTotalAttempts: 2, }, { description: "resp embeds generic ResourceExhausted", @@ -594,7 +594,7 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 0, + wantTotalAttempts: 1, }, { description: "resp embeds throughput ResourceExhausted", @@ -615,7 +615,7 @@ func TestManagedStream_Receiver(t *testing.T) { err: nil, }, }, - wantRequestCount: 1, + wantTotalAttempts: 2, }, { description: "retriable failures until max attempts", @@ -633,8 +633,8 @@ func TestManagedStream_Receiver(t *testing.T) { err: io.EOF, }, }, - wantRequestCount: 3, - wantFinalErr: io.EOF, + wantTotalAttempts: 4, + wantFinalErr: io.EOF, }, } @@ -664,14 +664,19 @@ func TestManagedStream_Receiver(t *testing.T) { // use openWithRetry to get the reference to the channel and add our test pending write. _, ch, _ := ms.openWithRetry() pw := newPendingWrite(ctx, [][]byte{[]byte("foo")}) + pw.attemptCount = 1 // we're injecting directly, but attribute this as a single attempt. ch <- pw // Wait until the write is marked done. <-pw.result.Ready() // Check retry count is as expected. - if gotRequestCount := len(testArc.requests); gotRequestCount != tc.wantRequestCount { - t.Errorf("%s: got %d retries, want %d retries", tc.description, gotRequestCount, tc.wantRequestCount) + gotTotalAttempts, err := pw.result.TotalAttempts(ctx) + if err != nil { + t.Errorf("failed to get total appends: %v", err) + } + if gotTotalAttempts != tc.wantTotalAttempts { + t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts) } // Check that the write got the expected final result. From 28a4562920bc05eb7882e32ab12d13a38d8adc56 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 21 Sep 2022 22:30:39 +0000 Subject: [PATCH 10/12] iterate on retryers --- .../storage/managedwriter/managed_stream.go | 11 ++-- .../managedwriter/managed_stream_test.go | 1 + bigquery/storage/managedwriter/retry.go | 57 +++++++++++-------- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index ed904cb25f1..29f3e8c4e24 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -225,7 +225,10 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := defaultRetryer{} + r := ms.retry + if r == nil { + r = newDefaultRetryer() + } for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" @@ -354,10 +357,6 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio for _, opt := range opts { opt.Resolve(&settings) } - var r gax.Retryer = &defaultRetryer{} - if settings.Retry != nil { - r = settings.Retry() - } for { appendErr := ms.lockingAppend(pw) @@ -368,7 +367,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) recordStat(ctx, AppendRequestErrors, 1) } - bo, shouldRetry := r.Retry(appendErr) + bo, shouldRetry := ms.retry.Retry(appendErr) if shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { return err diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index d69b8ffcda8..e34e133d0ac 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -75,6 +75,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } return nil, err }, + retry: newTestRetryer(), } arc, ch, err := ms.openWithRetry() if tc.wantFail && err == nil { diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index b2c2bb22d83..2159e709520 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -18,10 +18,10 @@ import ( "context" "errors" "io" + "math/rand" "strings" "time" - "github.com/googleapis/gax-go/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -32,34 +32,41 @@ var ( func newDefaultRetryer() *defaultRetryer { return &defaultRetryer{ - bigBo: gax.Backoff{ - Initial: 2 * time.Second, - Multiplier: 5, - Max: 5 * time.Minute, - }, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + minBackoff: 50 * time.Millisecond, + jitter: time.Second, + maxRetries: defaultAppendRetries, } } // a retryer that doesn't back off realistically, useful for testing without a // bunch of extra wait time. func newTestRetryer() *defaultRetryer { - return &defaultRetryer{ - bo: gax.Backoff{ - Initial: time.Millisecond, - Max: time.Millisecond, - }, - bigBo: gax.Backoff{ - Initial: time.Millisecond, - Max: time.Millisecond, - }, - } + r := newDefaultRetryer() + r.jitter = time.Nanosecond + return r } -// TODO: define a more correct backoff heuristic for stream-based retries. The default gax -// is insufficient for this case. +// defaultRetryer is a stateless retry, unlike a gax retryer which is designed +// for a retrying a single unary operation. This retryer is used for retrying +// appends, which are messages enqueued into a bidi stream. type defaultRetryer struct { - bo gax.Backoff - bigBo gax.Backoff // For more aggressive backoff, such as throughput quota + r *rand.Rand + minBackoff time.Duration + jitter time.Duration + maxRetries int +} + +func (dr *defaultRetryer) Pause(severe bool) time.Duration { + jitter := dr.jitter.Nanoseconds() + if jitter > 0 { + jitter = dr.r.Int63n(jitter) + } + pause := dr.minBackoff.Nanoseconds() + jitter + if severe { + pause = 10 * pause + } + return time.Duration(pause) } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { @@ -69,11 +76,11 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool if !ok { // Treat context errors as non-retriable. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return r.bo.Pause(), false + return r.Pause(false), false } // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { - return r.bo.Pause(), true + return r.Pause(false), true } // Any other non-status based errors are not retried. return 0, false @@ -84,12 +91,12 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool codes.DeadlineExceeded, codes.Internal, codes.Unavailable: - return r.bo.Pause(), true + return r.Pause(false), true case codes.ResourceExhausted: if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. - return r.bigBo.Pause(), true // more aggressive backoff + return r.Pause(true), true // more aggressive backoff } } return 0, false @@ -98,7 +105,7 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool // RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { - if attemptCount > defaultAppendRetries { + if attemptCount > r.maxRetries { return 0, false // exceeded maximum retries. } return r.Retry(err) From 0d56dc6ec33aa0e3df02ee6a6c120a1b605a7c25 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 22 Sep 2022 16:59:32 +0000 Subject: [PATCH 11/12] refactor retries: unary and stateless --- bigquery/storage/managedwriter/client.go | 2 +- .../storage/managedwriter/managed_stream.go | 37 +++--- .../managedwriter/managed_stream_test.go | 6 +- bigquery/storage/managedwriter/retry.go | 123 ++++++++++-------- bigquery/storage/managedwriter/retry_test.go | 56 ++++---- 5 files changed, 116 insertions(+), 108 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index c1a4b04897f..4408f1a4094 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -118,7 +118,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient }, open: createOpenF(ctx, streamFunc), // We add the new retryer by default, and add a new option to disable it. - retry: newDefaultRetryer(), + retry: newStatelessRetryer(), } // apply writer options diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 29f3e8c4e24..c1efb18b576 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -77,7 +77,7 @@ type ManagedStream struct { destinationTable string c *Client fc *flowController - retry *defaultRetryer + retry *statelessRetryer // aspects of the stream client ctx context.Context // retained context for the stream @@ -225,10 +225,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := ms.retry - if r == nil { - r = newDefaultRetryer() - } + r := &unaryRetryer{} for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" @@ -331,6 +328,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { } if err != nil { if shouldReconnect(err) { + // certain error responses are indicative that this connection is no longer healthy. + // if we encounter them, we force a reconnect so the next append has a healthy connection. ms.reconnect = true } return err @@ -367,7 +366,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) recordStat(ctx, AppendRequestErrors, 1) } - bo, shouldRetry := ms.retry.Retry(appendErr) + bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount) if shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { return err @@ -509,7 +508,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie recordStat(tagCtx, AppendResponseErrors, 1) } respErr := grpcstatus.ErrorProto(status) - if ms.shouldRetryAppend(respErr, nextWrite.attemptCount) { + if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry { // We use the status error to evaluate and possible re-enqueue the write. ms.processRetry(nextWrite, resp, respErr) // We're done with the write regardless of outcome, continue on to the next @@ -525,18 +524,13 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie // processRetry is responsible for evaluating and re-enqueing an append. // If the append is not retried, it is marked complete. -func (ms *ManagedStream) processRetry(pw *pendingWrite, initialResp *storagepb.AppendRowsResponse, initialErr error) { - if ms.retry == nil { - // No retries. Mark the write done and return. - pw.markDone(initialResp, initialErr, ms.fc) - return - } +func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { err := initialErr for { - pause, shouldRetry := ms.retry.RetryAppend(err, pw.attemptCount) + pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount) if !shouldRetry { // Should not attempt to re-append. - pw.markDone(initialResp, err, ms.fc) + pw.markDone(appendResp, err, ms.fc) return } time.Sleep(pause) @@ -552,10 +546,13 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, initialResp *storagepb.A } } -func (ms *ManagedStream) shouldRetryAppend(err error, attemptCount int) bool { - if ms.retry == nil { - return false +// returns the stateless retryer. If one's not set (re-enqueue retries disabled), +// it returns a retryer that only permits single attempts. +func (ms *ManagedStream) statelessRetryer() *statelessRetryer { + if ms.retry != nil { + return ms.retry + } + return &statelessRetryer{ + maxAttempts: 1, } - _, shouldRetry := ms.retry.RetryAppend(err, attemptCount) - return shouldRetry } diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index e34e133d0ac..838f1f4b269 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -75,7 +75,6 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } return nil, err }, - retry: newTestRetryer(), } arc, ch, err := ms.openWithRetry() if tc.wantFail && err == nil { @@ -325,6 +324,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) { // Append is intentionally slow. return nil }, nil), + retry: &statelessRetryer{}, } ms.schemaDescriptor = &descriptorpb.DescriptorProto{ Name: proto.String("testDescriptor"), @@ -412,6 +412,7 @@ func TestManagedStream_AppendDeadlocks(t *testing.T) { streamSettings: &streamSettings{ streamID: "foo", }, + retry: &statelessRetryer{}, } // first append @@ -447,6 +448,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { time.Sleep(40 * time.Millisecond) return nil }, nil), + retry: &statelessRetryer{}, } ms.schemaDescriptor = &descriptorpb.DescriptorProto{ Name: proto.String("testDescriptor"), @@ -660,7 +662,7 @@ func TestManagedStream_Receiver(t *testing.T) { ), streamSettings: defaultStreamSettings(), fc: newFlowController(0, 0), - retry: newTestRetryer(), + retry: newStatelessRetryer(), } // use openWithRetry to get the reference to the channel and add our test pending write. _, ch, _ := ms.openWithRetry() diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 2159e709520..adbe12ab9f2 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -15,75 +15,38 @@ package managedwriter import ( - "context" "errors" "io" "math/rand" "strings" "time" + "github.com/googleapis/gax-go/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var ( - defaultAppendRetries = 3 + defaultRetryAttempts = 4 ) -func newDefaultRetryer() *defaultRetryer { - return &defaultRetryer{ - r: rand.New(rand.NewSource(time.Now().UnixNano())), - minBackoff: 50 * time.Millisecond, - jitter: time.Second, - maxRetries: defaultAppendRetries, +// This retry predicate is used for higher level retries, enqueing appends onto to a bidi +// channel and evaluating whether an append should be retried (re-enqueued). +func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { + if err == nil { + return } -} - -// a retryer that doesn't back off realistically, useful for testing without a -// bunch of extra wait time. -func newTestRetryer() *defaultRetryer { - r := newDefaultRetryer() - r.jitter = time.Nanosecond - return r -} - -// defaultRetryer is a stateless retry, unlike a gax retryer which is designed -// for a retrying a single unary operation. This retryer is used for retrying -// appends, which are messages enqueued into a bidi stream. -type defaultRetryer struct { - r *rand.Rand - minBackoff time.Duration - jitter time.Duration - maxRetries int -} - -func (dr *defaultRetryer) Pause(severe bool) time.Duration { - jitter := dr.jitter.Nanoseconds() - if jitter > 0 { - jitter = dr.r.Int63n(jitter) - } - pause := dr.minBackoff.Nanoseconds() + jitter - if severe { - pause = 10 * pause - } - return time.Duration(pause) -} -func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // This predicate evaluates errors for both enqueuing and reconnection. - // See RetryAppend for retry that bounds attempts to a fixed number. s, ok := status.FromError(err) + // non-status based error conditions. if !ok { - // Treat context errors as non-retriable. - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return r.Pause(false), false - } // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { - return r.Pause(false), true + shouldRetry = true + return } - // Any other non-status based errors are not retried. - return 0, false + // All other non-status errors are treated as non-retryable (including context errors). + return } switch s.Code() { case codes.Aborted, @@ -91,24 +54,70 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool codes.DeadlineExceeded, codes.Internal, codes.Unavailable: - return r.Pause(false), true + shouldRetry = true + return case codes.ResourceExhausted: if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. - return r.Pause(true), true // more aggressive backoff + shouldRetry = true + return } } - return 0, false + return } -// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. -func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { +// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection. +type unaryRetryer struct { + bo gax.Backoff +} - if attemptCount > r.maxRetries { - return 0, false // exceeded maximum retries. +func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) { + shouldRetry, _ := retryPredicate(err) + return ur.bo.Pause(), shouldRetry +} + +// statelessRetryer is used for backing off within a continuous process, like processing the responses +// from the receive side of the bidi stream. An individual item in that process has a notion of an attempt +// count, and we use maximum retries as a way of evicting bad items. +type statelessRetryer struct { + r *rand.Rand + minBackoff time.Duration + jitter time.Duration + aggressiveFactor int + maxAttempts int +} + +func newStatelessRetryer() *statelessRetryer { + return &statelessRetryer{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + minBackoff: 50 * time.Millisecond, + jitter: time.Second, + maxAttempts: defaultRetryAttempts, + } +} + +func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { + jitter := sr.jitter.Nanoseconds() + if jitter > 0 { + jitter = sr.r.Int63n(jitter) + } + pause := sr.minBackoff.Nanoseconds() + jitter + if aggressiveBackoff { + pause = pause * int64(sr.aggressiveFactor) } - return r.Retry(err) + return time.Duration(pause) +} + +func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) { + if attemptCount >= sr.maxAttempts { + return 0, false + } + shouldRetry, aggressive := retryPredicate(err) + if shouldRetry { + return sr.pause(aggressive), true + } + return 0, false } // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force diff --git a/bigquery/storage/managedwriter/retry_test.go b/bigquery/storage/managedwriter/retry_test.go index 7ca4a188b7d..67d5f1e05ff 100644 --- a/bigquery/storage/managedwriter/retry_test.go +++ b/bigquery/storage/managedwriter/retry_test.go @@ -24,11 +24,12 @@ import ( "google.golang.org/grpc/status" ) -func TestManagedStream_ShouldReconnect(t *testing.T) { +func TestManagedStream_AppendErrorRetries(t *testing.T) { testCases := []struct { - err error - want bool + err error + attemptCount int + want bool }{ { err: fmt.Errorf("random error"), @@ -39,36 +40,38 @@ func TestManagedStream_ShouldReconnect(t *testing.T) { want: true, }, { - err: status.Error(codes.Unavailable, "nope"), - want: false, + err: io.EOF, + attemptCount: 4, + want: false, }, { - err: status.Error(codes.Unavailable, "the connection is draining"), + err: status.Error(codes.Unavailable, "nope"), want: true, }, { - err: func() error { - // wrap the underlying error in a gax apierror - ai, _ := apierror.FromError(status.Error(codes.Unavailable, "the connection is draining")) - return ai - }(), + err: status.Error(codes.ResourceExhausted, "out of gas"), + want: false, + }, + { + err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), want: true, }, } + retry := newStatelessRetryer() + for _, tc := range testCases { - if got := shouldReconnect(tc.err); got != tc.want { + if _, got := retry.Retry(tc.err, tc.attemptCount); got != tc.want { t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err) } } } -func TestManagedStream_AppendErrorRetries(t *testing.T) { +func TestManagedStream_ShouldReconnect(t *testing.T) { testCases := []struct { - err error - attemptCount int - want bool + err error + want bool }{ { err: fmt.Errorf("random error"), @@ -78,29 +81,26 @@ func TestManagedStream_AppendErrorRetries(t *testing.T) { err: io.EOF, want: true, }, - { - err: io.EOF, - attemptCount: 4, - want: false, - }, { err: status.Error(codes.Unavailable, "nope"), - want: true, + want: false, }, { - err: status.Error(codes.ResourceExhausted, "out of gas"), - want: false, + err: status.Error(codes.Unavailable, "the connection is draining"), + want: true, }, { - err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"), + err: func() error { + // wrap the underlying error in a gax apierror + ai, _ := apierror.FromError(status.Error(codes.Unavailable, "the connection is draining")) + return ai + }(), want: true, }, } - retry := newDefaultRetryer() - for _, tc := range testCases { - if _, got := retry.RetryAppend(tc.err, tc.attemptCount); got != tc.want { + if got := shouldReconnect(tc.err); got != tc.want { t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err) } } From 15703e6a7550a5b0f09bf0a02a148ff720cd86d3 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 22 Sep 2022 21:40:15 +0000 Subject: [PATCH 12/12] remove unnecessary cruft --- bigquery/storage/managedwriter/managed_stream_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 838f1f4b269..fa7fdadae8f 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -324,7 +324,6 @@ func TestManagedStream_ContextExpiry(t *testing.T) { // Append is intentionally slow. return nil }, nil), - retry: &statelessRetryer{}, } ms.schemaDescriptor = &descriptorpb.DescriptorProto{ Name: proto.String("testDescriptor"), @@ -412,7 +411,6 @@ func TestManagedStream_AppendDeadlocks(t *testing.T) { streamSettings: &streamSettings{ streamID: "foo", }, - retry: &statelessRetryer{}, } // first append @@ -448,7 +446,6 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { time.Sleep(40 * time.Millisecond) return nil }, nil), - retry: &statelessRetryer{}, } ms.schemaDescriptor = &descriptorpb.DescriptorProto{ Name: proto.String("testDescriptor"),