Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): switch to opt-in retry #6765

Merged
merged 4 commits into from Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -117,8 +117,6 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: createOpenF(ctx, streamFunc),
// We add the new retryer by default, and add a new option to disable it.
retry: newStatelessRetryer(),
}

// apply writer options
Expand Down
84 changes: 79 additions & 5 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsert", func(t *testing.T) {
testLargeInsert(ctx, t, mwClient, bqClient, dataset)
t.Run("TestLargeInsertNoRetry", func(t *testing.T) {
testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})
})
}
Expand Down Expand Up @@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(int64(len(testSimpleData))))
}

func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

// Construct a Very Large request.
var data [][]byte
targetSize := 11 * 1024 * 1024 // 11 MB
b, err := proto.Marshal(testSimpleData[0])
if err != nil {
t.Errorf("failed to marshal message: %v", err)
}

numRows := targetSize / len(b)
data = make([][]byte, numRows)

for i := 0; i < numRows; i++ {
data[i] = b
}

result, err := ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("single append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("GetResult error was not an instance of ApiError")
}
status := apiErr.GRPCStatus()
if status.Code() != codes.InvalidArgument {
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// our next append should fail (we don't have retries enabled).
if _, err = ms.AppendRows(ctx, [][]byte{b}); err == nil {
t.Fatalf("expected second append to fail, got success: %v", err)
}

// The send failure triggers reconnect, so an additional append will succeed.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("third append expected to succeed, got error: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from third append: %v", err)
}
}

func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
Expand All @@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
EnableWriteRetries(true),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
Expand Down Expand Up @@ -646,15 +716,19 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// send a subsequent append as verification we can proceed.

// The second append will succeed, but internally will show a retry.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("subsequent append failed: %v", err)
t.Fatalf("second append expected to succeed, got error: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from second append: %v", err)
}
if attempts, _ := result.TotalAttempts(ctx); attempts != 2 {
t.Errorf("expected 2 attempts, got %d", attempts)
}
}

func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -527,7 +527,7 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount)
pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
if !shouldRetry {
// Should not attempt to re-append.
pw.markDone(appendResp, err, ms.fc)
Expand Down
12 changes: 8 additions & 4 deletions bigquery/storage/managedwriter/options.go
Expand Up @@ -98,11 +98,15 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
}
}

// DisableWriteRetries disables the logic for automatically re-enqueuing failed writes.
func DisableWriteRetries(disable bool) WriterOption {
// EnableWriteRetry enables ManagedStream to automatically retry failed appends.
//
// Enabling retries is best suited for cases where users want to achieve at-least-once
// append semantics. Use of automatic retries may complicate patterns where the user
// is designing for exactly-once append semantics.
func EnableWriteRetries(enable bool) WriterOption {
return func(ms *ManagedStream) {
if disable {
ms.retry = nil
if enable {
ms.retry = newStatelessRetryer()
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion bigquery/storage/managedwriter/options_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/bigquery/internal"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -122,12 +123,24 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "EnableRetries",
options: []WriterOption{EnableWriteRetries(true)},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.retry = newStatelessRetryer()
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTraceID("traceid"),
EnableWriteRetries(true),
},
want: func() *ManagedStream {
ms := &ManagedStream{
Expand All @@ -136,6 +149,7 @@ func TestWriterOptions(t *testing.T) {
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version)
ms.retry = newStatelessRetryer()
return ms
}(),
},
Expand All @@ -151,7 +165,8 @@ func TestWriterOptions(t *testing.T) {

if diff := cmp.Diff(got, tc.want,
cmp.AllowUnexported(ManagedStream{}, streamSettings{}),
cmp.AllowUnexported(sync.Mutex{})); diff != "" {
cmp.AllowUnexported(sync.Mutex{}),
cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" {
t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
}
}
Expand Down