diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 7ce771a4d70..2d21b3eec40 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -98,11 +98,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient c: c, ctx: ctx, cancel: cancel, - open: func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) { + callOptions: []gax.CallOption{ + gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), + }, + open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { arc, err := streamFunc( // Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually. - metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)), - gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024))) + metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID))) if err != nil { return nil, err } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index b7ce9a881f6..a003ef7079e 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -79,9 +79,10 @@ type ManagedStream struct { fc *flowController // aspects of the stream client - ctx context.Context // retained context for the stream - cancel context.CancelFunc - open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + ctx context.Context // retained context for the stream + cancel context.CancelFunc + callOptions []gax.CallOption // options passed when opening an append client + open func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection mu sync.Mutex arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection @@ -141,14 +142,14 @@ func (ms *ManagedStream) StreamType() StreamType { // FlushRows advances the offset at which rows in a BufferedStream are visible. Calling // this method for other stream types yields an error. -func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) { +func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) { req := &storagepb.FlushRowsRequest{ WriteStream: ms.streamSettings.streamID, Offset: &wrapperspb.Int64Value{ Value: offset, }, } - resp, err := ms.c.rawClient.FlushRows(ctx, req) + resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...) recordStat(ms.ctx, FlushRequests, 1) if err != nil { return 0, err @@ -161,12 +162,12 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er // // Finalizing does not advance the current offset of a BufferedStream, nor does it commit // data in a PendingStream. -func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { +func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) { // TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. req := &storagepb.FinalizeWriteStreamRequest{ Name: ms.streamSettings.streamID, } - resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req) + resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...) if err != nil { return 0, err } @@ -208,7 +209,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie if ms.streamSettings != nil { streamID = ms.streamSettings.streamID } - arc, err := ms.open(streamID) + arc, err := ms.open(streamID, ms.callOptions...) bo, shouldRetry := r.Retry(err) if err != nil && shouldRetry { recordStat(ms.ctx, AppendClientOpenRetryCount, 1) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 581a2ecb0f5..94d087c44ba 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/googleapis/gax-go/v2" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -55,7 +56,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { for _, tc := range testCases { ms := &ManagedStream{ ctx: context.Background(), - open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { if len(tc.errors) == 0 { panic("out of errors") } @@ -107,7 +108,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) { ms := &ManagedStream{ ctx: ctx, - open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) { + open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { testARC.openCount = testARC.openCount + 1 return testARC, nil }, diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index 9c06ddd7286..97aa4aa82a0 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -14,7 +14,10 @@ package managedwriter -import "google.golang.org/protobuf/types/descriptorpb" +import ( + "github.com/googleapis/gax-go/v2" + "google.golang.org/protobuf/types/descriptorpb" +) // WriterOption are variadic options used to configure a ManagedStream instance. type WriterOption func(*ManagedStream) @@ -85,3 +88,11 @@ func WithDataOrigin(dataOrigin string) WriterOption { ms.streamSettings.dataOrigin = dataOrigin } } + +// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when +// it opens the underlying append stream. +func WithAppendRowsCallOption(o gax.CallOption) WriterOption { + return func(ms *ManagedStream) { + ms.callOptions = append(ms.callOptions, o) + } +} diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go index 72480a3e651..5d81035ff29 100644 --- a/bigquery/storage/managedwriter/writer_option_test.go +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -19,6 +19,8 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/googleapis/gax-go/v2" + "google.golang.org/grpc" ) func TestWriterOptions(t *testing.T) { @@ -94,6 +96,19 @@ func TestWriterOptions(t *testing.T) { return ms }(), }, + { + desc: "WithCallOption", + options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))}, + want: func() *ManagedStream { + ms := &ManagedStream{ + streamSettings: defaultStreamSettings(), + callOptions: []gax.CallOption{ + gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)), + }, + } + return ms + }(), + }, { desc: "multiple", options: []WriterOption{