From 508c48939185892524a4891bb64e48322e2778db Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 9 Sep 2022 23:52:20 +0000 Subject: [PATCH 1/6] feat(bigquery/storage/managedwriter): define append retry predicate This PR models the retry predicate we'll use for evaluating whether appends should be retried automatically. --- bigquery/storage/managedwriter/retry.go | 70 ++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 7ace796c163..55fb31b315f 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -21,17 +21,26 @@ import ( "time" "github.com/googleapis/gax-go/v2" + "github.com/googleapis/gax-go/v2/apierror" + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +var ( + defaultAppendRetries = 3 + knownReconnectErrors = []error{ + io.EOF, + status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport + } +) + type defaultRetryer struct { bo gax.Backoff } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // TODO: refine this logic in a subsequent PR, there's some service-specific - // retry predicates in addition to statuscode-based. + // This predicate evaluates enqueuing. s, ok := status.FromError(err) if !ok { // Treat context errors as non-retriable. @@ -49,15 +58,62 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool } } +func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { + if err == nil { + return 0, false // This shouldn't need to be here, and is only provided defensively. + } + if attemptCount > defaultAppendRetries { + return 0, false // exceeded maximum retries. + } + // This predicate evaluates the received response to determine if we should re-enqueue. + apiErr, ok := apierror.FromError(err) + if !ok { + // These are non status-based errors. + // Context errors are non-retriable. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return 0, false + } + // The same errors that trigger fast reconnect are retryable, as they deal with transient problems + // with the network stream. + if shouldReconnect(err) { + return r.bo.Pause(), true + } + // Any other non-status based errors are not retried. + return 0, false + } + // Next, evaluate service-specific error details. + se := &storagepb.StorageError{} + if e := apiErr.Details().ExtractProtoMessage(se); e == nil { + if se.GetCode() == storagepb.StorageError_OFFSET_OUT_OF_RANGE { + return r.bo.Pause(), true + } + // No other service-specific errors should be retried. + return 0, false + } + if quota := apiErr.Details().QuotaFailure; quota != nil { + // TODO: followup with yiru on this, there's some deeper checks on resource exhaustion. + return r.bo.Pause(), true + } + // Finally, evaluate based on the more generic grpc error status: + code := apiErr.GRPCStatus().Code() + switch code { + case codes.Aborted, + codes.DeadlineExceeded, + codes.Internal, + codes.Unavailable: + return r.bo.Pause(), true + case codes.ResourceExhausted: + // TODO: is there a special case here that's not quota? + } + // We treat all other failures as non-retriable. + return 0, false +} + // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force // our bidi stream to close/reopen based on the responses error. Errors here signal that no // further appends will succeed. func shouldReconnect(err error) bool { - var knownErrors = []error{ - io.EOF, - status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport - } - for _, ke := range knownErrors { + for _, ke := range knownReconnectErrors { if errors.Is(err, ke) { return true } From c6b5fb321d8c701127a1bb6a6591ed1301ad8358 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 12 Sep 2022 18:35:53 +0000 Subject: [PATCH 2/6] add throughput retry --- .../storage/managedwriter/instrumentation.go | 12 +- .../storage/managedwriter/integration_test.go | 111 ++++++++++++------ .../storage/managedwriter/managed_stream.go | 2 +- bigquery/storage/managedwriter/retry.go | 24 +++- 4 files changed, 104 insertions(+), 45 deletions(-) diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 464b99a69ed..56d7eba0b11 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -67,6 +67,10 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) + // AppendRequestRetries is a measure of the number of append requests that were retried. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequestRetries = stats.Int64(statsPrefix+"append_request_retries", "Number of append requests retried", stats.UnitDimensionless) + // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) @@ -102,7 +106,11 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrorsView *view.View - // AppendRequestRowsView is a cumulative sum of AppendRows. + // AppendRequestRowsView is a cumulative sum of AppendRequestRows. + // It is EXPERIMENTAL and subject to change or removal without notice. + AppendRequestRetriesView *view.View + + // AppendRequestRetriesView is a cumulative sum of AppendRequestRetries. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRowsView *view.View @@ -127,6 +135,7 @@ func init() { AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError) AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) + AppendRequestRetriesView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) @@ -141,6 +150,7 @@ func init() { AppendRequestBytesView, AppendRequestErrorsView, AppendRequestRowsView, + AppendRequestRetriesView, AppendResponsesView, AppendResponseErrorsView, diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 9db77641d2f..f42458e81ec 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -16,6 +16,7 @@ package managedwriter import ( "context" + "encoding/json" "fmt" "math" "sync" @@ -31,7 +32,7 @@ import ( "go.opencensus.io/stats/view" "google.golang.org/api/option" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" - "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" @@ -44,7 +45,7 @@ import ( var ( datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) - defaultTestTimeout = 45 * time.Second + defaultTestTimeout = 300 * time.Second ) // our test data has cardinality 5 for names, 3 for values @@ -165,6 +166,23 @@ func TestIntegration_ManagedWriter(t *testing.T) { }) } +func TestIntegration_QuotaBehaviors(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east4") + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + testLargeInsert(ctx, t, mwClient, bqClient, dataset) +} + func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { @@ -605,20 +623,8 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie m := &testdata.SimpleMessageProto2{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) - ms, err := mwClient.NewManagedStream(ctx, - WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), - WithType(CommittedStream), - WithSchemaDescriptor(descriptorProto), - ) - if err != nil { - t.Fatalf("NewManagedStream: %v", err) - } - validateTableConstraints(ctx, t, bqClient, testTable, "before send", - withExactRowCount(0)) - - // Construct a Very Large request. var data [][]byte - targetSize := 11 * 1024 * 1024 // 11 MB + targetSize := 7 * 1024 * 1024 // 11 MB b, err := proto.Marshal(testSimpleData[0]) if err != nil { t.Errorf("failed to marshal message: %v", err) @@ -631,30 +637,61 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie data[i] = b } - result, err := ms.AppendRows(ctx, data, WithOffset(0)) - if err != nil { - t.Errorf("single append failed: %v", err) - } - _, err = result.GetResult(ctx) - if err != nil { - apiErr, ok := apierror.FromError(err) - if !ok { - t.Errorf("GetResult error was not an instance of ApiError") - } - status := apiErr.GRPCStatus() - if status.Code() != codes.InvalidArgument { - t.Errorf("expected InvalidArgument status, got %v", status) + var wg sync.WaitGroup + var wg2 sync.WaitGroup + resultCh := make(chan *AppendResult, 2000) + + wg2.Add(1) + go func() { + defer wg2.Done() + entry := 0 + for result := range resultCh { + entry = entry + 1 + if _, resErr := result.GetResult(ctx); resErr != nil { + if status, ok := status.FromError(resErr); ok { + t.Logf("code: %s", status.Code().String()) + t.Logf("message: %s", status.Message()) + for k, det := range status.Details() { + t.Logf("detail %d (%T): %+v", k, det, det) + } + } + t.Errorf("err: %v", resErr) + if apiErr, ok := apierror.FromError(resErr); ok { + t.Errorf("apiErr: %+v", apiErr) + if quota := apiErr.Details().QuotaFailure; quota != nil { + t.Errorf("quota: %v", quota) + for k, violation := range quota.GetViolations() { + t.Logf("violation %d subject(%s) Description(%s)", k, violation.GetSubject(), violation.GetDescription()) + } + } + if b, err := json.MarshalIndent(apiErr.Details(), "", " "); err == nil { + t.Errorf("details: %s", string(b)) + } + } else { + t.Errorf("wtf: %v", err) + } + } } + }() + for z := 0; z < 20; z++ { + wg.Add(1) + go func() { + defer wg.Done() + ms, _ := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithSchemaDescriptor(descriptorProto)) + for x := 0; x < 50; x++ { + result, err := ms.AppendRows(ctx, data) + if err != nil { + t.Errorf("single append failed: %v", err) + } + resultCh <- result + } + }() } - // send a subsequent append as verification we can proceed. - result, err = ms.AppendRows(ctx, [][]byte{b}) - if err != nil { - t.Fatalf("subsequent append failed: %v", err) - } - _, err = result.GetResult(ctx) - if err != nil { - t.Errorf("failure result from second append: %v", err) - } + wg.Wait() + close(resultCh) + wg2.Wait() } func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 76a5d028421..3495b98487a 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -223,7 +223,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := defaultRetryer{} + r := newDefaultRetryer() for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 55fb31b315f..0910bc2939d 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "strings" "time" "github.com/googleapis/gax-go/v2" @@ -35,8 +36,19 @@ var ( } ) +func newDefaultRetryer() *defaultRetryer { + return &defaultRetryer{ + bigBo: gax.Backoff{ + Initial: 2 * time.Second, + Multiplier: 5, + Max: 5 * time.Minute, + }, + } +} + type defaultRetryer struct { - bo gax.Backoff + bo gax.Backoff + bigBo gax.Backoff // for more aggressive backoff, such as throughput quota } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { @@ -90,10 +102,6 @@ func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Du // No other service-specific errors should be retried. return 0, false } - if quota := apiErr.Details().QuotaFailure; quota != nil { - // TODO: followup with yiru on this, there's some deeper checks on resource exhaustion. - return r.bo.Pause(), true - } // Finally, evaluate based on the more generic grpc error status: code := apiErr.GRPCStatus().Code() switch code { @@ -103,7 +111,11 @@ func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Du codes.Unavailable: return r.bo.Pause(), true case codes.ResourceExhausted: - // TODO: is there a special case here that's not quota? + if strings.HasPrefix(apiErr.GRPCStatus().Message(), "Exceeds 'AppendRows throughput' quota") { + // Note: b/246031522 is open against backend to give this a structured error + // and avoid string parsing. + return r.bigBo.Pause(), true // more aggressive backoff + } } // We treat all other failures as non-retriable. return 0, false From 5a5deab593a309c2bccb6537c53ef101b0219f42 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 12 Sep 2022 18:39:51 +0000 Subject: [PATCH 3/6] cleanup --- .../storage/managedwriter/instrumentation.go | 12 +- .../storage/managedwriter/integration_test.go | 111 ++++++------------ .../storage/managedwriter/managed_stream.go | 2 +- bigquery/storage/managedwriter/retry.go | 4 +- 4 files changed, 41 insertions(+), 88 deletions(-) diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 56d7eba0b11..464b99a69ed 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -67,10 +67,6 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless) - // AppendRequestRetries is a measure of the number of append requests that were retried. - // It is EXPERIMENTAL and subject to change or removal without notice. - AppendRequestRetries = stats.Int64(statsPrefix+"append_request_retries", "Number of append requests retried", stats.UnitDimensionless) - // AppendResponses is a measure of the number of append responses received. // It is EXPERIMENTAL and subject to change or removal without notice. AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless) @@ -106,11 +102,7 @@ var ( // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestErrorsView *view.View - // AppendRequestRowsView is a cumulative sum of AppendRequestRows. - // It is EXPERIMENTAL and subject to change or removal without notice. - AppendRequestRetriesView *view.View - - // AppendRequestRetriesView is a cumulative sum of AppendRequestRetries. + // AppendRequestRowsView is a cumulative sum of AppendRows. // It is EXPERIMENTAL and subject to change or removal without notice. AppendRequestRowsView *view.View @@ -135,7 +127,6 @@ func init() { AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError) AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) - AppendRequestRetriesView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin) AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin) AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError) @@ -150,7 +141,6 @@ func init() { AppendRequestBytesView, AppendRequestErrorsView, AppendRequestRowsView, - AppendRequestRetriesView, AppendResponsesView, AppendResponseErrorsView, diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index f42458e81ec..9db77641d2f 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -16,7 +16,6 @@ package managedwriter import ( "context" - "encoding/json" "fmt" "math" "sync" @@ -32,7 +31,7 @@ import ( "go.opencensus.io/stats/view" "google.golang.org/api/option" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" - "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" @@ -45,7 +44,7 @@ import ( var ( datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) - defaultTestTimeout = 300 * time.Second + defaultTestTimeout = 45 * time.Second ) // our test data has cardinality 5 for names, 3 for values @@ -166,23 +165,6 @@ func TestIntegration_ManagedWriter(t *testing.T) { }) } -func TestIntegration_QuotaBehaviors(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t) - defer mwClient.Close() - defer bqClient.Close() - - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east4") - if err != nil { - t.Fatalf("failed to init test dataset: %v", err) - } - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - testLargeInsert(ctx, t, mwClient, bqClient, dataset) -} - func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { @@ -623,8 +605,20 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie m := &testdata.SimpleMessageProto2{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateTableConstraints(ctx, t, bqClient, testTable, "before send", + withExactRowCount(0)) + + // Construct a Very Large request. var data [][]byte - targetSize := 7 * 1024 * 1024 // 11 MB + targetSize := 11 * 1024 * 1024 // 11 MB b, err := proto.Marshal(testSimpleData[0]) if err != nil { t.Errorf("failed to marshal message: %v", err) @@ -637,61 +631,30 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie data[i] = b } - var wg sync.WaitGroup - var wg2 sync.WaitGroup - resultCh := make(chan *AppendResult, 2000) - - wg2.Add(1) - go func() { - defer wg2.Done() - entry := 0 - for result := range resultCh { - entry = entry + 1 - if _, resErr := result.GetResult(ctx); resErr != nil { - if status, ok := status.FromError(resErr); ok { - t.Logf("code: %s", status.Code().String()) - t.Logf("message: %s", status.Message()) - for k, det := range status.Details() { - t.Logf("detail %d (%T): %+v", k, det, det) - } - } - t.Errorf("err: %v", resErr) - if apiErr, ok := apierror.FromError(resErr); ok { - t.Errorf("apiErr: %+v", apiErr) - if quota := apiErr.Details().QuotaFailure; quota != nil { - t.Errorf("quota: %v", quota) - for k, violation := range quota.GetViolations() { - t.Logf("violation %d subject(%s) Description(%s)", k, violation.GetSubject(), violation.GetDescription()) - } - } - if b, err := json.MarshalIndent(apiErr.Details(), "", " "); err == nil { - t.Errorf("details: %s", string(b)) - } - } else { - t.Errorf("wtf: %v", err) - } - } + result, err := ms.AppendRows(ctx, data, WithOffset(0)) + if err != nil { + t.Errorf("single append failed: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + apiErr, ok := apierror.FromError(err) + if !ok { + t.Errorf("GetResult error was not an instance of ApiError") + } + status := apiErr.GRPCStatus() + if status.Code() != codes.InvalidArgument { + t.Errorf("expected InvalidArgument status, got %v", status) } - }() - for z := 0; z < 20; z++ { - wg.Add(1) - go func() { - defer wg.Done() - ms, _ := mwClient.NewManagedStream(ctx, - WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), - WithSchemaDescriptor(descriptorProto)) - for x := 0; x < 50; x++ { - result, err := ms.AppendRows(ctx, data) - if err != nil { - t.Errorf("single append failed: %v", err) - } - resultCh <- result - } - }() } - wg.Wait() - close(resultCh) - wg2.Wait() + // send a subsequent append as verification we can proceed. + result, err = ms.AppendRows(ctx, [][]byte{b}) + if err != nil { + t.Fatalf("subsequent append failed: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("failure result from second append: %v", err) + } } func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 3495b98487a..76a5d028421 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -223,7 +223,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // // Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - r := newDefaultRetryer() + r := defaultRetryer{} for { recordStat(ms.ctx, AppendClientOpenCount, 1) streamID := "" diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 0910bc2939d..b175f981d76 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -112,8 +112,8 @@ func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Du return r.bo.Pause(), true case codes.ResourceExhausted: if strings.HasPrefix(apiErr.GRPCStatus().Message(), "Exceeds 'AppendRows throughput' quota") { - // Note: b/246031522 is open against backend to give this a structured error - // and avoid string parsing. + // Note: internal b/246031522 opened to give this a structured error + // and avoid string parsing. Should be a QuotaFailure or similar. return r.bigBo.Pause(), true // more aggressive backoff } } From d3b6c7f8bef7b99ff2e24209137612501bf4120a Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Sep 2022 19:48:01 +0000 Subject: [PATCH 4/6] amend predicate per recent discussions --- bigquery/storage/managedwriter/retry.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index b175f981d76..d79671cfa00 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -23,7 +23,6 @@ import ( "github.com/googleapis/gax-go/v2" "github.com/googleapis/gax-go/v2/apierror" - storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -85,27 +84,21 @@ func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Du if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return 0, false } - // The same errors that trigger fast reconnect are retryable, as they deal with transient problems - // with the network stream. - if shouldReconnect(err) { + // EOF can happen in the case of connection close before response received. + if errors.Is(err, io.EOF) { return r.bo.Pause(), true } // Any other non-status based errors are not retried. return 0, false } - // Next, evaluate service-specific error details. - se := &storagepb.StorageError{} - if e := apiErr.Details().ExtractProtoMessage(se); e == nil { - if se.GetCode() == storagepb.StorageError_OFFSET_OUT_OF_RANGE { - return r.bo.Pause(), true - } - // No other service-specific errors should be retried. - return 0, false - } - // Finally, evaluate based on the more generic grpc error status: + // Evaluate based on the more generic grpc error status. + // TODO: Revisit whether we want to include some user-induced + // race conditions that map into FailedPrecondition once it's clearer whether that's + // safe to retry by default. code := apiErr.GRPCStatus().Code() switch code { case codes.Aborted, + codes.Canceled, codes.DeadlineExceeded, codes.Internal, codes.Unavailable: From d502529933966c4e413bb6e6383c0fc4d69bd3a0 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 16 Sep 2022 18:15:46 +0000 Subject: [PATCH 5/6] refactor to consolidate predicate evaluation --- bigquery/storage/managedwriter/retry.go | 64 ++++++++----------------- 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index d79671cfa00..dc89c169837 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -22,17 +22,12 @@ import ( "time" "github.com/googleapis/gax-go/v2" - "github.com/googleapis/gax-go/v2/apierror" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var ( defaultAppendRetries = 3 - knownReconnectErrors = []error{ - io.EOF, - status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport - } ) func newDefaultRetryer() *defaultRetryer { @@ -47,7 +42,7 @@ func newDefaultRetryer() *defaultRetryer { type defaultRetryer struct { bo gax.Backoff - bigBo gax.Backoff // for more aggressive backoff, such as throughput quota + bigBo gax.Backoff // For more aggressive backoff, such as throughput quota } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { @@ -58,45 +53,14 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return r.bo.Pause(), false } - // Any other non-status based errors treated as retryable. - return r.bo.Pause(), true - } - switch s.Code() { - case codes.Unavailable: - return r.bo.Pause(), true - default: - return r.bo.Pause(), false - } -} - -func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { - if err == nil { - return 0, false // This shouldn't need to be here, and is only provided defensively. - } - if attemptCount > defaultAppendRetries { - return 0, false // exceeded maximum retries. - } - // This predicate evaluates the received response to determine if we should re-enqueue. - apiErr, ok := apierror.FromError(err) - if !ok { - // These are non status-based errors. - // Context errors are non-retriable. - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return 0, false - } - // EOF can happen in the case of connection close before response received. + // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { return r.bo.Pause(), true } - // Any other non-status based errors are not retried. - return 0, false + // Any other non-status based errors treated as retryable. + return r.bo.Pause(), true } - // Evaluate based on the more generic grpc error status. - // TODO: Revisit whether we want to include some user-induced - // race conditions that map into FailedPrecondition once it's clearer whether that's - // safe to retry by default. - code := apiErr.GRPCStatus().Code() - switch code { + switch s.Code() { case codes.Aborted, codes.Canceled, codes.DeadlineExceeded, @@ -104,21 +68,33 @@ func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Du codes.Unavailable: return r.bo.Pause(), true case codes.ResourceExhausted: - if strings.HasPrefix(apiErr.GRPCStatus().Message(), "Exceeds 'AppendRows throughput' quota") { + if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. return r.bigBo.Pause(), true // more aggressive backoff } } - // We treat all other failures as non-retriable. return 0, false } +// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts. +func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) { + + if attemptCount > defaultAppendRetries { + return 0, false // exceeded maximum retries. + } + return r.Retry(err) +} + // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force // our bidi stream to close/reopen based on the responses error. Errors here signal that no // further appends will succeed. func shouldReconnect(err error) bool { - for _, ke := range knownReconnectErrors { + var knownErrors = []error{ + io.EOF, + status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport + } + for _, ke := range knownErrors { if errors.Is(err, ke) { return true } From 7737b797631e60973cc9978cb54fe5731408d3f9 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 16 Sep 2022 18:19:00 +0000 Subject: [PATCH 6/6] update comments --- bigquery/storage/managedwriter/retry.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index dc89c169837..5560443a64e 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -46,7 +46,8 @@ type defaultRetryer struct { } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // This predicate evaluates enqueuing. + // This predicate evaluates errors for both enqueuing and reconnection. + // See RetryAppend for retry that bounds attempts to a fixed number. s, ok := status.FromError(err) if !ok { // Treat context errors as non-retriable.