Skip to content

Commit

Permalink
feat: extend managedstream to support call options
Browse files Browse the repository at this point in the history
This plumbs the ability to pass gax.CallOption opts to the
underlying client underpinning the ManagedStream.  It also
adds a WithAppendRowsCallOption option to the constructor,
as well as adding direct option passing for operations like
Finalize() and FlushRows().

Towards: googleapis#4366
  • Loading branch information
shollyman committed Nov 3, 2021
1 parent 1399283 commit eba5ddb
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 14 deletions.
8 changes: 5 additions & 3 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -98,11 +98,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
c: c,
ctx: ctx,
cancel: cancel,
open: func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)),
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)))
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -79,9 +79,10 @@ type ManagedStream struct {
fc *flowController

// aspects of the stream client
ctx context.Context // retained context for the stream
cancel context.CancelFunc
open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
ctx context.Context // retained context for the stream
cancel context.CancelFunc
callOptions []gax.CallOption // options passed when opening an append client
open func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
Expand Down Expand Up @@ -141,14 +142,14 @@ func (ms *ManagedStream) StreamType() StreamType {

// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
// this method for other stream types yields an error.
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
req := &storagepb.FlushRowsRequest{
WriteStream: ms.streamSettings.streamID,
Offset: &wrapperspb.Int64Value{
Value: offset,
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req)
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
recordStat(ms.ctx, FlushRequests, 1)
if err != nil {
return 0, err
Expand All @@ -161,12 +162,12 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er
//
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
// data in a PendingStream.
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
req := &storagepb.FinalizeWriteStreamRequest{
Name: ms.streamSettings.streamID,
}
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -208,7 +209,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
if ms.streamSettings != nil {
streamID = ms.streamSettings.streamID
}
arc, err := ms.open(streamID)
arc, err := ms.open(streamID, ms.callOptions...)
bo, shouldRetry := r.Retry(err)
if err != nil && shouldRetry {
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
Expand Down
5 changes: 3 additions & 2 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/googleapis/gax-go/v2"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {
for _, tc := range testCases {
ms := &ManagedStream{
ctx: context.Background(),
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
if len(tc.errors) == 0 {
panic("out of errors")
}
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {

ms := &ManagedStream{
ctx: ctx,
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
testARC.openCount = testARC.openCount + 1
return testARC, nil
},
Expand Down
13 changes: 12 additions & 1 deletion bigquery/storage/managedwriter/writer_option.go
Expand Up @@ -14,7 +14,10 @@

package managedwriter

import "google.golang.org/protobuf/types/descriptorpb"
import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/protobuf/types/descriptorpb"
)

// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)
Expand Down Expand Up @@ -85,3 +88,11 @@ func WithDataOrigin(dataOrigin string) WriterOption {
ms.streamSettings.dataOrigin = dataOrigin
}
}

// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
// it opens the underlying append stream.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.callOptions = append(ms.callOptions, o)
}
}
15 changes: 15 additions & 0 deletions bigquery/storage/managedwriter/writer_option_test.go
Expand Up @@ -19,6 +19,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
)

func TestWriterOptions(t *testing.T) {
Expand Down Expand Up @@ -94,6 +96,19 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "WithCallOption",
options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
}
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
Expand Down

0 comments on commit eba5ddb

Please sign in to comment.