From c233a229042216cf06fffeb868e5a3186e0992f8 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 14 Jul 2021 23:39:51 +0000 Subject: [PATCH 01/14] feat: first cut at stream plumbing --- bigquery/storage/managedwriter/client.go | 17 ++- .../storage/managedwriter/managed_stream.go | 133 ++++++++++++++++++ bigquery/storage/managedwriter/retry.go | 41 ++++++ .../managedwriter/writer_option_test.go | 4 +- 4 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 bigquery/storage/managedwriter/retry.go diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 4d0bd68bdc0..fcd4ade673a 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -53,12 +53,27 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio }, nil } -// NewManagedStream establishes a new managed stream for appending data into a table. +// NewManagedStream establishes a managed stream for appending data. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { + return c.buildManagedStream(ctx, c.rawClient.AppendRows, opts...) +} + +func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, opts ...WriterOption) (*ManagedStream, error) { + + ctx, cancel := context.WithCancel(ctx) ms := &ManagedStream{ streamSettings: defaultStreamSettings(), c: c, + ctx: ctx, + cancel: cancel, + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + arc, err := streamFunc(ctx) + if err == nil { + return nil, err + } + return arc, nil + }, } // apply writer options diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 93854911300..afda3e2d139 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -16,8 +16,12 @@ package managedwriter import ( "context" + "io" + "sync" + "github.com/googleapis/gax-go/v2" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -62,10 +66,24 @@ func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { // ManagedStream is the abstraction over a single write stream. type ManagedStream struct { streamSettings *streamSettings + schemaDescriptor *descriptorpb.DescriptorProto destinationTable string c *Client + + // aspects of the stream client + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + mu sync.Mutex + err error // terminal error + pending chan *pendingWrite + sentFirstAppend bool } +// enables testing +type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) + // streamSettings govern behavior of the append stream RPCs. type streamSettings struct { @@ -140,3 +158,118 @@ func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { } return resp.GetRowCount(), nil } + +// streamConn tracks things we need to keep in sync for a single ARC. +type streamConn struct { + arc *storagepb.BigQueryWrite_AppendRowsClient + sentFirstResponse bool + pending chan *pendingWrite +} + +// getStream returns either a valid client or permanent error. +func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.err != nil { + return nil, ms.err + } + ms.err = ms.ctx.Err() + if ms.err != nil { + return nil, ms.err + } + + // Always return the retained ARC if the arg differs. + if arc != ms.arc { + return ms.arc, nil + } + + ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) + *ms.arc, ms.err = ms.openWithRetry() + // TODO: wire up receiver for processing responses. + ms.sentFirstAppend = false + return ms.arc, ms.err +} + +func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, error) { + r := defaultRetryer{} + for { + arc, err := ms.open() + bo, shouldRetry := r.Retry(err) + if err != nil && shouldRetry { + if err := gax.Sleep(ms.ctx, bo); err != nil { + return nil, err + } + continue + } + return arc, err + } +} + +func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient) error, opts ...gax.CallOption) error { + var settings gax.CallSettings + for _, opt := range opts { + opt.Resolve(&settings) + } + var r gax.Retryer = &defaultRetryer{} + if settings.Retry != nil { + r = settings.Retry() + } + + var arc *storagepb.BigQueryWrite_AppendRowsClient + var err error + + for { + arc, err = ms.getStream(arc) + if err != nil { + return err + } + err = f(*arc) + if err != nil { + bo, shouldRetry := r.Retry(err) + if shouldRetry { + if err := gax.Sleep(ms.ctx, bo); err != nil { + return err + } + continue + } + ms.mu.Lock() + ms.err = err + ms.mu.Unlock() + } + return err + } +} + +func (ms *ManagedStream) append(req *storagepb.AppendRowsRequest) error { + return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient) error { + // TODO: we should only send stream ID and schema for the first message in a new stream, but + // we need to find the best place to toggle this for new streams. + req.WriteStream = ms.streamSettings.streamID + req.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + ProtoDescriptor: ms.schemaDescriptor, + } + ms.mu.Unlock() + return arc.Send(req) + }) +} + +func (ms *ManagedStream) CloseSend() error { + err := ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient) error { + return arc.CloseSend() + }) + ms.mu.Lock() + ms.err = io.EOF + ms.mu.Unlock() + return err +} + +// AppendRows sends the append requests to the service, and returns one AppendResult per row. +func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResult, error) { + pw := newPendingWrite(data, offset) + if err := ms.append(pw.request); err != nil { + // pending write is DOA, mark it done. + pw.markDone(NoStreamOffset, err) + return nil, err + } + return pw.results, nil +} diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go new file mode 100644 index 00000000000..4faac105a02 --- /dev/null +++ b/bigquery/storage/managedwriter/retry.go @@ -0,0 +1,41 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "time" + + "github.com/googleapis/gax-go/v2" +) + +type defaultRetryer struct { + bo gax.Backoff +} + +func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { + // TODO: define this logic in a subsequent PR, there's some service-specific + // retry predicates in addition to statuscode-based. + return r.bo.Pause(), false +} + +type streamingRetryer struct { + defaultRetryer gax.Retryer +} + +func (r *streamingRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { + // TODO: define this logic in a subsequent PR, there's some service-specific + // retry predicates in addition to statuscode-based. + return r.defaultRetryer.Retry(err) +} diff --git a/bigquery/storage/managedwriter/writer_option_test.go b/bigquery/storage/managedwriter/writer_option_test.go index c1e754fed1a..929f8b2292e 100644 --- a/bigquery/storage/managedwriter/writer_option_test.go +++ b/bigquery/storage/managedwriter/writer_option_test.go @@ -15,6 +15,7 @@ package managedwriter import ( + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -110,7 +111,8 @@ func TestWriterOptions(t *testing.T) { } if diff := cmp.Diff(got, tc.want, - cmp.AllowUnexported(ManagedStream{}, streamSettings{})); diff != "" { + cmp.AllowUnexported(ManagedStream{}, streamSettings{}), + cmp.AllowUnexported(sync.Mutex{})); diff != "" { t.Errorf("diff in case (%s):\n%v", tc.desc, diff) } } From 7372a7e800b855f77bbbc21cc94dcbcedbf72a9b Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Jul 2021 05:42:58 +0000 Subject: [PATCH 02/14] stash current state --- bigquery/storage/managedwriter/client.go | 8 +- .../storage/managedwriter/integration_test.go | 208 ++++++++++++++++++ .../storage/managedwriter/managed_stream.go | 126 +++++++---- .../managedwriter/testdata/messages.pb.go | 174 +++++++++++++++ .../managedwriter/testdata/messages.proto | 29 +++ .../storage/managedwriter/writer_option.go | 9 + 6 files changed, 514 insertions(+), 40 deletions(-) create mode 100644 bigquery/storage/managedwriter/integration_test.go create mode 100644 bigquery/storage/managedwriter/testdata/messages.pb.go create mode 100644 bigquery/storage/managedwriter/testdata/messages.proto diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index fcd4ade673a..e80cd4a0e7e 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -53,7 +53,13 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio }, nil } -// NewManagedStream establishes a managed stream for appending data. +// Close releases resources held by the client. +func (c *Client) Close() error { + // TODO: we should retain a references to instantiated clients, or have a client-local context. + return fmt.Errorf("not implemented") +} + +// NewManagedStream establishes a new managed stream for appending data into a table. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { return c.buildManagedStream(ctx, c.rawClient.AppendRows, opts...) } diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go new file mode 100644 index 00000000000..660410243cb --- /dev/null +++ b/bigquery/storage/managedwriter/integration_test.go @@ -0,0 +1,208 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "fmt" + "testing" + "time" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/bigquery/storage/managedwriter/adapt" + "cloud.google.com/go/bigquery/storage/managedwriter/testdata" + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/internal/uid" + "google.golang.org/api/option" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" +) + +var ( + datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) + tableIDs = uid.NewSpace("testtable", &uid.Options{Sep: '_', Time: time.Now()}) + defaultTestTimeout = 15 * time.Second +) + +func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + projID := testutil.ProjID() + if projID == "" { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery") + if ts == nil { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + opts = append(opts, option.WithTokenSource(ts)) + client, err := NewClient(ctx, projID, opts...) + if err != nil { + t.Fatalf("couldn't create managedwriter client: %v", err) + } + + bqClient, err := bigquery.NewClient(ctx, projID, opts...) + if err != nil { + t.Fatalf("couldn't create bigquery client: %v", err) + } + return client, bqClient +} + +// validateRowCount confirms the number of rows in a table visible to the query engine. +func validateRowCount(ctx context.Context, client *bigquery.Client, tbl *bigquery.Table) (int64, error) { + + // Verify data is present in the table with a count query. + sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID) + q := client.Query(sql) + it, err := q.Read(ctx) + if err != nil { + return 0, fmt.Errorf("failed to issue validation query: %v", err) + } + var rowdata []bigquery.Value + err = it.Next(&rowdata) + if err != nil { + return 0, fmt.Errorf("iterator error: %v", err) + } + + if count, ok := rowdata[0].(int64); ok { + return count, nil + } + return 0, fmt.Errorf("got unexpected value %v", rowdata[0]) +} + +func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) { + dataset := bqc.Dataset(datasetIDs.New()) + if err := dataset.Create(ctx, nil); err != nil { + return nil, nil, err + } + return dataset, func() { + if err := dataset.DeleteWithContents(ctx); err != nil { + t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err) + } + }, nil +} + +func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { + convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) + if err != nil { + t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err) + } + + descriptor, err := adapt.StorageSchemaToDescriptor(convertedSchema, "root") + if err != nil { + t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err) + } + messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) + if !ok { + t.Fatalf("adapted descriptor is not a message descriptor") + } + return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) +} + +func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, _ := context.WithTimeout(context.Background(), defaultTestTimeout) + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a test proto, but we need a descriptorproto + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + // prevalidate we have no data in table. + rc, err := validateRowCount(ctx, bqClient, testTable) + if err != nil { + t.Fatalf("failed to execute validation: %v", err) + } + if rc != 0 { + t.Errorf("expected no rows at start, got %d", rc) + } + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the rows individually. + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + ms.AppendRows(data, NoStreamOffset) + } + + rc, err = validateRowCount(ctx, bqClient, testTable) + if err != nil { + t.Fatalf("failed to execute validation: %v", err) + } + want := int64(len(testData)) + if rc != want { + t.Errorf("validation mismatch on first round, got %d, want %d", rc, want) + } + + // Now, send the rows in a single message: + var data [][]byte + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := append(data, b) + ms.AppendRows(data, NoStreamOffset) + } + + rc, err = validateRowCount(ctx, bqClient, testTable) + if err != nil { + t.Fatalf("failed to execute validation: %v", err) + } + want = int64(2 * len(testData)) + if rc != want { + t.Errorf("validation mismatch on second round, got %d, want %d", rc, want) + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index afda3e2d139..fbdefac4865 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -21,6 +21,7 @@ import ( "github.com/googleapis/gax-go/v2" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -71,14 +72,13 @@ type ManagedStream struct { c *Client // aspects of the stream client - ctx context.Context // retained context for the stream - cancel context.CancelFunc - open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection - arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection - mu sync.Mutex - err error // terminal error - pending chan *pendingWrite - sentFirstAppend bool + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + mu sync.Mutex + err error // terminal error + pending chan *pendingWrite } // enables testing @@ -159,53 +159,54 @@ func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { return resp.GetRowCount(), nil } -// streamConn tracks things we need to keep in sync for a single ARC. -type streamConn struct { - arc *storagepb.BigQueryWrite_AppendRowsClient - sentFirstResponse bool - pending chan *pendingWrite -} - // getStream returns either a valid client or permanent error. -func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, error) { +func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { ms.mu.Lock() defer ms.mu.Unlock() if ms.err != nil { - return nil, ms.err + return nil, nil, ms.err } ms.err = ms.ctx.Err() if ms.err != nil { - return nil, ms.err + return nil, nil, ms.err } // Always return the retained ARC if the arg differs. if arc != ms.arc { - return ms.arc, nil + return ms.arc, ms.pending, nil } ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) - *ms.arc, ms.err = ms.openWithRetry() - // TODO: wire up receiver for processing responses. - ms.sentFirstAppend = false - return ms.arc, ms.err + *ms.arc, ms.pending, ms.err = ms.openWithRetry() + return ms.arc, ms.pending, ms.err } -func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, error) { +// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. +func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { r := defaultRetryer{} for { arc, err := ms.open() bo, shouldRetry := r.Retry(err) if err != nil && shouldRetry { if err := gax.Sleep(ms.ctx, bo); err != nil { - return nil, err + return nil, nil, err } continue } - return arc, err + if err == nil { + // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan + // and fire up the associated receive processor. + ch := make(chan *pendingWrite) + go recvProcessor(ms.ctx, arc, ch) + return arc, ch, nil + } + return arc, nil, err } } -func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient) error, opts ...gax.CallOption) error { +// call serves as a closure that forwards the call to a (possibly reopened) AppendRowsClient), and the associated +// pendingWrite channel for the ARC connection. +func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite) error, opts ...gax.CallOption) error { var settings gax.CallSettings for _, opt := range opts { opt.Resolve(&settings) @@ -216,14 +217,15 @@ func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient) e } var arc *storagepb.BigQueryWrite_AppendRowsClient + var ch chan *pendingWrite var err error for { - arc, err = ms.getStream(arc) + arc, ch, err = ms.getStream(arc) if err != nil { return err } - err = f(*arc) + err = f(*arc, ch) if err != nil { bo, shouldRetry := r.Retry(err) if shouldRetry { @@ -240,22 +242,29 @@ func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient) e } } -func (ms *ManagedStream) append(req *storagepb.AppendRowsRequest) error { - return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient) error { +func (ms *ManagedStream) append(pw *pendingWrite) error { + return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { // TODO: we should only send stream ID and schema for the first message in a new stream, but - // we need to find the best place to toggle this for new streams. - req.WriteStream = ms.streamSettings.streamID - req.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + // we need an elegant way to handle this. + pw.request.WriteStream = ms.streamSettings.streamID + pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ ProtoDescriptor: ms.schemaDescriptor, } - ms.mu.Unlock() - return arc.Send(req) + err := arc.Send(pw.request) + if err != nil { + ch <- pw + } + return err }) } func (ms *ManagedStream) CloseSend() error { - err := ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient) error { - return arc.CloseSend() + err := ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { + err := arc.CloseSend() + if err == nil { + close(ch) + } + return err }) ms.mu.Lock() ms.err = io.EOF @@ -266,10 +275,49 @@ func (ms *ManagedStream) CloseSend() error { // AppendRows sends the append requests to the service, and returns one AppendResult per row. func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResult, error) { pw := newPendingWrite(data, offset) - if err := ms.append(pw.request); err != nil { + if err := ms.append(pw); err != nil { // pending write is DOA, mark it done. pw.markDone(NoStreamOffset, err) return nil, err } return pw.results, nil } + +// recvProcessor is used to pair responses back up with the origin writes. +func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { + for { + select { + case <-ctx.Done(): + // Context is done, so we're not going to get further updates. However, we need to finalize all remaining + // writes on the channel so users don't block indefinitely. + for { + pw, ok := <-ch + if !ok { + return + } + pw.markDone(NoStreamOffset, ctx.Err()) + } + case nextWrite, ok := <-ch: + if !ok { + // Channel closed, all elements processed. + return + } + + resp, err := arc.Recv() + if err != nil { + nextWrite.markDone(NoStreamOffset, err) + } + + if status := resp.GetError(); status != nil { + nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status)) + continue + } + success := resp.GetAppendResult() + off := success.GetOffset() + if off != nil { + nextWrite.markDone(off.GetValue(), nil) + } + nextWrite.markDone(NoStreamOffset, nil) + } + } +} diff --git a/bigquery/storage/managedwriter/testdata/messages.pb.go b/bigquery/storage/managedwriter/testdata/messages.pb.go new file mode 100644 index 00000000000..8fc86cb3442 --- /dev/null +++ b/bigquery/storage/managedwriter/testdata/messages.pb.go @@ -0,0 +1,174 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.10.1 +// source: messages.proto + +package testdata + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +// SimpleMessage represents a simple message that transmits a string and int64 value. +type SimpleMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Value int64 `protobuf:"varint,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *SimpleMessage) Reset() { + *x = SimpleMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SimpleMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SimpleMessage) ProtoMessage() {} + +func (x *SimpleMessage) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SimpleMessage.ProtoReflect.Descriptor instead. +func (*SimpleMessage) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{0} +} + +func (x *SimpleMessage) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *SimpleMessage) GetValue() int64 { + if x != nil { + return x.Value + } + return 0 +} + +var File_messages_proto protoreflect.FileDescriptor + +var file_messages_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x22, 0x39, 0x0a, 0x0d, 0x53, 0x69, + 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x61, 0x70, + 0x69, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x32, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_messages_proto_rawDescOnce sync.Once + file_messages_proto_rawDescData = file_messages_proto_rawDesc +) + +func file_messages_proto_rawDescGZIP() []byte { + file_messages_proto_rawDescOnce.Do(func() { + file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData) + }) + return file_messages_proto_rawDescData +} + +var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_messages_proto_goTypes = []interface{}{ + (*SimpleMessage)(nil), // 0: testdata.SimpleMessage +} +var file_messages_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_messages_proto_init() } +func file_messages_proto_init() { + if File_messages_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SimpleMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_messages_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_messages_proto_goTypes, + DependencyIndexes: file_messages_proto_depIdxs, + MessageInfos: file_messages_proto_msgTypes, + }.Build() + File_messages_proto = out.File + file_messages_proto_rawDesc = nil + file_messages_proto_goTypes = nil + file_messages_proto_depIdxs = nil +} diff --git a/bigquery/storage/managedwriter/testdata/messages.proto b/bigquery/storage/managedwriter/testdata/messages.proto new file mode 100644 index 00000000000..3112ed93f92 --- /dev/null +++ b/bigquery/storage/managedwriter/testdata/messages.proto @@ -0,0 +1,29 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package testdata; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1beta2/testdata"; + + +// SimpleMessage represents a simple message that transmits a string and int64 value. +message SimpleMessage { + string name = 1; + int64 value = 2; +} + +// TODO: use wrappers.proto to define a nullable form of the message. + + + diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index e0032788436..0670cbe0973 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -14,6 +14,8 @@ package managedwriter +import "google.golang.org/protobuf/types/descriptorpb" + // WriterOption is used to configure a ManagedWriteClient. type WriterOption func(*ManagedStream) @@ -67,3 +69,10 @@ func WithTracePrefix(prefix string) WriterOption { ms.streamSettings.TracePrefix = prefix } } + +// WithDescriptor describes the format of messages you'll be sending to the service. +func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { + return func(ms *ManagedStream) { + ms.schemaDescriptor = dp + } +} From 041e74a0c984caef3e046c91c730ac1803e1dad4 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Jul 2021 16:59:15 +0000 Subject: [PATCH 03/14] more retry things --- .../managedwriter/managed_stream_test.go | 81 +++++++++++++++++++ bigquery/storage/managedwriter/retry.go | 18 ++++- 2 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 bigquery/storage/managedwriter/managed_stream_test.go diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go new file mode 100644 index 00000000000..86433379f89 --- /dev/null +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -0,0 +1,81 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package managedwriter + +import ( + "context" + "testing" + + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestManagedStream_OpenWithRetry(t *testing.T) { + + testCases := []struct { + desc string + errors []error + wantFail bool + }{ + { + desc: "no error", + errors: []error{nil}, + wantFail: false, + }, + { + desc: "transient failures", + errors: []error{ + status.Errorf(codes.Unavailable, "try 1"), + status.Errorf(codes.Unavailable, "try 2"), + nil}, + wantFail: false, + }, + { + desc: "terminal error", + errors: []error{status.Errorf(codes.InvalidArgument, "bad args")}, + wantFail: true, + }, + } + + for _, tc := range testCases { + ms := &ManagedStream{ + ctx: context.Background(), + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + if len(tc.errors) == 0 { + panic("out of errors") + } + err := tc.errors[0] + tc.errors = tc.errors[1:] + if err == nil { + return &testAppendRowsClient{}, nil + } + return nil, err + }, + } + _, _, err := ms.openWithRetry() + if tc.wantFail && err == nil { + t.Errorf("case %s: wanted failure, got success", tc.desc) + } + if !tc.wantFail && err != nil { + t.Errorf("case %s: wanted success, got %v", tc.desc, err) + } + } +} + +type testAppendRowsClient struct { + storagepb.BigQueryWrite_AppendRowsClient + err error +} diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 4faac105a02..f8873598bc1 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -18,6 +18,8 @@ import ( "time" "github.com/googleapis/gax-go/v2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type defaultRetryer struct { @@ -25,9 +27,19 @@ type defaultRetryer struct { } func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // TODO: define this logic in a subsequent PR, there's some service-specific + // TODO: refine this logic in a subsequent PR, there's some service-specific // retry predicates in addition to statuscode-based. - return r.bo.Pause(), false + s, ok := status.FromError(err) + if !ok { + // non-status based errors as retryable + return r.bo.Pause(), true + } + switch s.Code() { + case codes.Unavailable: + return r.bo.Pause(), true + default: + return r.bo.Pause(), false + } } type streamingRetryer struct { @@ -35,7 +47,7 @@ type streamingRetryer struct { } func (r *streamingRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // TODO: define this logic in a subsequent PR, there's some service-specific + // TODO: refine this logic in a subsequent PR, there's some service-specific // retry predicates in addition to statuscode-based. return r.defaultRetryer.Retry(err) } From fcc6d45e2a0e3f976907fb084b540c69fdc724e6 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Jul 2021 17:01:34 +0000 Subject: [PATCH 04/14] refine test --- bigquery/storage/managedwriter/managed_stream_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 86433379f89..fc0cc3790f8 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -65,13 +65,21 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { return nil, err }, } - _, _, err := ms.openWithRetry() + arc, ch, err := ms.openWithRetry() if tc.wantFail && err == nil { t.Errorf("case %s: wanted failure, got success", tc.desc) } if !tc.wantFail && err != nil { t.Errorf("case %s: wanted success, got %v", tc.desc, err) } + if err == nil { + if arc == nil { + t.Errorf("case %s: expected append client, got nil", tc.desc) + } + if ch == nil { + t.Errorf("case %s: expected channel, got nil", tc.desc) + } + } } } From df48e6c686ae3f470cbb94d33f31a750bf9d1cd5 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Jul 2021 19:05:38 +0000 Subject: [PATCH 05/14] working integration --- bigquery/storage/managedwriter/client.go | 51 ++++++++++--------- .../storage/managedwriter/managed_stream.go | 5 +- .../managedwriter/managed_stream_test.go | 4 ++ 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index e80cd4a0e7e..2350726f665 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -21,8 +21,10 @@ import ( "strings" storage "cloud.google.com/go/bigquery/storage/apiv1beta2" + "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/grpc" ) // Client is a managed BigQuery Storage write client scoped to a single project. @@ -61,10 +63,10 @@ func (c *Client) Close() error { // NewManagedStream establishes a new managed stream for appending data into a table. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { - return c.buildManagedStream(ctx, c.rawClient.AppendRows, opts...) + return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...) } -func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, opts ...WriterOption) (*ManagedStream, error) { +func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) { ctx, cancel := context.WithCancel(ctx) @@ -74,8 +76,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient ctx: ctx, cancel: cancel, open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { - arc, err := streamFunc(ctx) - if err == nil { + arc, err := streamFunc(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024))) + if err != nil { return nil, err } return arc, nil @@ -87,28 +89,31 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient opt(ms) } - if err := c.validateOptions(ctx, ms); err != nil { - return nil, err - } + // skipSetup exists for testing scenarios. + if !skipSetup { + if err := c.validateOptions(ctx, ms); err != nil { + return nil, err + } - if ms.streamSettings.streamID == "" { - // not instantiated with a stream, construct one. - streamName := fmt.Sprintf("%s/_default", ms.destinationTable) - if ms.streamSettings.streamType != DefaultStream { - // For everything but a default stream, we create a new stream on behalf of the user. - req := &storagepb.CreateWriteStreamRequest{ - Parent: ms.destinationTable, - WriteStream: &storagepb.WriteStream{ - Type: streamTypeToEnum(ms.streamSettings.streamType), - }} - resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) - if err != nil { - return nil, fmt.Errorf("couldn't create write stream: %v", err) + if ms.streamSettings.streamID == "" { + // not instantiated with a stream, construct one. + streamName := fmt.Sprintf("%s/_default", ms.destinationTable) + if ms.streamSettings.streamType != DefaultStream { + // For everything but a default stream, we create a new stream on behalf of the user. + req := &storagepb.CreateWriteStreamRequest{ + Parent: ms.destinationTable, + WriteStream: &storagepb.WriteStream{ + Type: streamTypeToEnum(ms.streamSettings.streamType), + }} + resp, err := ms.c.rawClient.CreateWriteStream(ctx, req) + if err != nil { + return nil, fmt.Errorf("couldn't create write stream: %v", err) + } + streamName = resp.GetName() } - streamName = resp.GetName() + ms.streamSettings.streamID = streamName + // TODO(followup CLs): instantiate an appendstream client, flow controller, etc. } - ms.streamSettings.streamID = streamName - // TODO(followup CLs): instantiate an appendstream client, flow controller, etc. } return ms, nil diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index fbdefac4865..40eaf506be3 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -17,6 +17,7 @@ package managedwriter import ( "context" "io" + "log" "sync" "github.com/googleapis/gax-go/v2" @@ -251,7 +252,7 @@ func (ms *ManagedStream) append(pw *pendingWrite) error { ProtoDescriptor: ms.schemaDescriptor, } err := arc.Send(pw.request) - if err != nil { + if err == nil { ch <- pw } return err @@ -305,10 +306,12 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl resp, err := arc.Recv() if err != nil { + log.Printf("recv got err: %#v", err) nextWrite.markDone(NoStreamOffset, err) } if status := resp.GetError(); status != nil { + log.Printf("recv got err status: %#v", status) nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status)) continue } diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index fc0cc3790f8..c543b7617b8 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -83,6 +83,10 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } } +func TestManagedStream_GetStream(t *testing.T) { + +} + type testAppendRowsClient struct { storagepb.BigQueryWrite_AppendRowsClient err error From 6dfcb54e9431c3e5f9f99725fdccccc5cdd93c5a Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Thu, 15 Jul 2021 22:31:15 +0000 Subject: [PATCH 06/14] refine integration --- .../storage/managedwriter/integration_test.go | 53 +++++++------------ .../storage/managedwriter/managed_stream.go | 1 + 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 660410243cb..0eef13d5720 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -34,7 +34,7 @@ import ( var ( datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) - tableIDs = uid.NewSpace("testtable", &uid.Options{Sep: '_', Time: time.Now()}) + tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) defaultTestTimeout = 15 * time.Second ) @@ -64,25 +64,29 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti } // validateRowCount confirms the number of rows in a table visible to the query engine. -func validateRowCount(ctx context.Context, client *bigquery.Client, tbl *bigquery.Table) (int64, error) { +func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) { // Verify data is present in the table with a count query. sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID) q := client.Query(sql) it, err := q.Read(ctx) if err != nil { - return 0, fmt.Errorf("failed to issue validation query: %v", err) + t.Errorf("failed to issue validation query: %v", err) + return } var rowdata []bigquery.Value err = it.Next(&rowdata) if err != nil { - return 0, fmt.Errorf("iterator error: %v", err) + t.Errorf("error fetching validation results: %v", err) + return } - - if count, ok := rowdata[0].(int64); ok { - return count, nil + count, ok := rowdata[0].(int64) + if !ok { + t.Errorf("got unexpected data from validation query: %v", rowdata[0]) + } + if count != expectedRows { + t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows) } - return 0, fmt.Errorf("got unexpected value %v", rowdata[0]) } func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) { @@ -151,13 +155,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { } // prevalidate we have no data in table. - rc, err := validateRowCount(ctx, bqClient, testTable) - if err != nil { - t.Fatalf("failed to execute validation: %v", err) - } - if rc != 0 { - t.Errorf("expected no rows at start, got %d", rc) - } + validateRowCount(ctx, t, bqClient, testTable, 0) testData := []*testdata.SimpleMessage{ {Name: "one", Value: 1}, @@ -167,7 +165,7 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { {Name: "five", Value: 2}, } - // First, send the rows individually. + // First, send the test rows individually. for k, mesg := range testData { b, err := proto.Marshal(mesg) if err != nil { @@ -176,17 +174,10 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { data := [][]byte{b} ms.AppendRows(data, NoStreamOffset) } + wantRows := int64(len(testData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows) - rc, err = validateRowCount(ctx, bqClient, testTable) - if err != nil { - t.Fatalf("failed to execute validation: %v", err) - } - want := int64(len(testData)) - if rc != want { - t.Errorf("validation mismatch on first round, got %d, want %d", rc, want) - } - - // Now, send the rows in a single message: + // Now, send the rows grouped into in a single append. var data [][]byte for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -197,12 +188,6 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { ms.AppendRows(data, NoStreamOffset) } - rc, err = validateRowCount(ctx, bqClient, testTable) - if err != nil { - t.Fatalf("failed to execute validation: %v", err) - } - want = int64(2 * len(testData)) - if rc != want { - t.Errorf("validation mismatch on second round, got %d, want %d", rc, want) - } + wantRows = wantRows * 2 + validateRowCount(ctx, t, bqClient, testTable, wantRows) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 40eaf506be3..dda08252414 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -304,6 +304,7 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl return } + // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { log.Printf("recv got err: %#v", err) From 55ddeef3182c4abb31c5458e17a44c3174ea32ff Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 16 Jul 2021 17:06:09 +0000 Subject: [PATCH 07/14] refine and documentation pass --- bigquery/storage/managedwriter/doc.go | 3 +- .../storage/managedwriter/integration_test.go | 23 +++++++++++---- .../storage/managedwriter/managed_stream.go | 28 +++++++++++-------- bigquery/storage/managedwriter/retry.go | 10 ------- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go index a8e580bd90c..5fb6dfd6f54 100644 --- a/bigquery/storage/managedwriter/doc.go +++ b/bigquery/storage/managedwriter/doc.go @@ -14,7 +14,8 @@ // Package managedwriter will be a thick client around the storage API's BigQueryWriteClient. // -// It is EXPERIMENTAL and subject to change or removal without notice. +// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha +// state, and breaking changes are frequent. // // Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is // a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 0eef13d5720..0f89ebde633 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -89,6 +89,7 @@ func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client } } +// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred. func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) { dataset := bqc.Dataset(datasetIDs.New()) if err := dataset.Create(ctx, nil); err != nil { @@ -101,6 +102,7 @@ func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) ( }, nil } +// setupDynamicDescriptors aids testing when not using a supplied proto func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) if err != nil { @@ -140,7 +142,8 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - // We'll use a test proto, but we need a descriptorproto + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. m := &testdata.SimpleMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) @@ -166,18 +169,24 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { } // First, send the test rows individually. + var results []*AppendResult for k, mesg := range testData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - ms.AppendRows(data, NoStreamOffset) + results, err = ms.AppendRows(data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } } + // wait for the result to indicate ready, then validate. + results[0].Ready() wantRows := int64(len(testData)) validateRowCount(ctx, t, bqClient, testTable, wantRows) - // Now, send the rows grouped into in a single append. + // Now, send the test rows grouped into in a single append. var data [][]byte for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -185,9 +194,13 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { t.Errorf("failed to marshal message %d: %v", k, err) } data := append(data, b) - ms.AppendRows(data, NoStreamOffset) + results, err = ms.AppendRows(data, NoStreamOffset) + if err != nil { + t.Errorf("grouped-row append failed: %v", err) + } } - + // wait for the result to indicate ready, then validate again. + results[0].Ready() wantRows = wantRows * 2 validateRowCount(ctx, t, bqClient, testTable, wantRows) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index dda08252414..572a9f83948 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -73,13 +73,15 @@ type ManagedStream struct { c *Client // aspects of the stream client - ctx context.Context // retained context for the stream - cancel context.CancelFunc - open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection - arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection - mu sync.Mutex - err error // terminal error - pending chan *pendingWrite + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + mu sync.Mutex + err error // terminal error + + pending chan *pendingWrite + recvCancel context.CancelFunc } // enables testing @@ -183,6 +185,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient } // openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. +// The expectation is this is only ever invoked when the caller has the mutex lock. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { r := defaultRetryer{} for { @@ -246,7 +249,8 @@ func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient, c func (ms *ManagedStream) append(pw *pendingWrite) error { return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { // TODO: we should only send stream ID and schema for the first message in a new stream, but - // we need an elegant way to handle this. + // we've decoupled the individual appends from caring about state of the connection. We likely + // need to do this via a mutex-guarded state flag. pw.request.WriteStream = ms.streamSettings.streamID pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ ProtoDescriptor: ms.schemaDescriptor, @@ -269,6 +273,7 @@ func (ms *ManagedStream) CloseSend() error { }) ms.mu.Lock() ms.err = io.EOF + close(ms.pending) ms.mu.Unlock() return err } @@ -284,13 +289,14 @@ func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResul return pw.results, nil } -// recvProcessor is used to pair responses back up with the origin writes. +// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { + // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply + // ensure that pending writes get acknowledged with a terminal state. for { select { case <-ctx.Done(): - // Context is done, so we're not going to get further updates. However, we need to finalize all remaining - // writes on the channel so users don't block indefinitely. + // Context is done, so we're not going to get further updates. Mark all work failed with the context error. for { pw, ok := <-ch if !ok { diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index f8873598bc1..1f272a99932 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -41,13 +41,3 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool return r.bo.Pause(), false } } - -type streamingRetryer struct { - defaultRetryer gax.Retryer -} - -func (r *streamingRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { - // TODO: refine this logic in a subsequent PR, there's some service-specific - // retry predicates in addition to statuscode-based. - return r.defaultRetryer.Retry(err) -} From b92a9c34d4d5dfae0382c0ff4ed99ff965498400 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 16 Jul 2021 17:21:24 +0000 Subject: [PATCH 08/14] remove unusued field --- bigquery/storage/managedwriter/managed_stream.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 572a9f83948..5269c4663ce 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -73,15 +73,13 @@ type ManagedStream struct { c *Client // aspects of the stream client - ctx context.Context // retained context for the stream - cancel context.CancelFunc - open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection - arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection - mu sync.Mutex - err error // terminal error - - pending chan *pendingWrite - recvCancel context.CancelFunc + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + mu sync.Mutex + err error // terminal error + pending chan *pendingWrite } // enables testing From ab38551c7620150e00a3be704ada3a01eb968b82 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 16 Jul 2021 22:22:02 +0000 Subject: [PATCH 09/14] address reviewer comments --- bigquery/storage/managedwriter/client.go | 9 +++++++-- .../storage/managedwriter/integration_test.go | 3 ++- .../storage/managedwriter/managed_stream.go | 18 ++++++++---------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 2350726f665..d3a37fdc8da 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -57,8 +57,13 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio // Close releases resources held by the client. func (c *Client) Close() error { - // TODO: we should retain a references to instantiated clients, or have a client-local context. - return fmt.Errorf("not implemented") + // TODO: consider if we should propagate a cancellation from client to all associated managed streams. + if c.rawClient == nil { + return fmt.Errorf("already closed") + } + c.rawClient.Close() + c.rawClient = nil + return nil } // NewManagedStream establishes a new managed stream for appending data into a table. diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 0f89ebde633..4b276b2e7aa 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -131,7 +131,8 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { } defer cleanup() - ctx, _ := context.WithTimeout(context.Background(), defaultTestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 5269c4663ce..59794469bf7 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -17,7 +17,6 @@ package managedwriter import ( "context" "io" - "log" "sync" "github.com/googleapis/gax-go/v2" @@ -73,13 +72,14 @@ type ManagedStream struct { c *Client // aspects of the stream client - ctx context.Context // retained context for the stream - cancel context.CancelFunc - open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection - arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + ctx context.Context // retained context for the stream + cancel context.CancelFunc + open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection + mu sync.Mutex - err error // terminal error - pending chan *pendingWrite + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + err error // terminal error + pending chan *pendingWrite // writes awaiting status } // enables testing @@ -261,7 +261,7 @@ func (ms *ManagedStream) append(pw *pendingWrite) error { }) } -func (ms *ManagedStream) CloseSend() error { +func (ms *ManagedStream) Close() error { err := ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { err := arc.CloseSend() if err == nil { @@ -311,12 +311,10 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl // block until we get a corresponding response or err from stream. resp, err := arc.Recv() if err != nil { - log.Printf("recv got err: %#v", err) nextWrite.markDone(NoStreamOffset, err) } if status := resp.GetError(); status != nil { - log.Printf("recv got err status: %#v", status) nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status)) continue } From ee496d93b4d8cec6e32a7b326a21443be22ca0ab Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 23 Jul 2021 01:01:01 +0000 Subject: [PATCH 10/14] address the "send ids only for first request" TODO, add testing --- .../storage/managedwriter/managed_stream.go | 70 +++++++------- .../managedwriter/managed_stream_test.go | 94 ++++++++++++++++++- 2 files changed, 130 insertions(+), 34 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 59794469bf7..2fa13e0e250 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -16,7 +16,9 @@ package managedwriter import ( "context" + "fmt" "io" + "log" "sync" "github.com/googleapis/gax-go/v2" @@ -76,10 +78,11 @@ type ManagedStream struct { cancel context.CancelFunc open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection - mu sync.Mutex - arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection - err error // terminal error - pending chan *pendingWrite // writes awaiting status + mu sync.Mutex + arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection + err error // terminal error + pending chan *pendingWrite // writes awaiting status + streamSetup *sync.Once // handles amending the first request in a new stream } // enables testing @@ -160,7 +163,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { return resp.GetRowCount(), nil } -// getStream returns either a valid client or permanent error. +// getStream returns either a valid ARC client stream or permanent error. func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { ms.mu.Lock() defer ms.mu.Unlock() @@ -200,15 +203,15 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie // and fire up the associated receive processor. ch := make(chan *pendingWrite) go recvProcessor(ms.ctx, arc, ch) + // also, replace sync.Once for starting a new stream, as we need to do this for every new connection + ms.streamSetup = new(sync.Once) return arc, ch, nil } return arc, nil, err } } -// call serves as a closure that forwards the call to a (possibly reopened) AppendRowsClient), and the associated -// pendingWrite channel for the ARC connection. -func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite) error, opts ...gax.CallOption) error { +func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error { var settings gax.CallSettings for _, opt := range opts { opt.Resolve(&settings) @@ -227,7 +230,25 @@ func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient, c if err != nil { return err } - err = f(*arc, ch) + var req *storagepb.AppendRowsRequest + ms.streamSetup.Do(func() { + log.Println("firing streamSetup") + reqCopy := *pw.request + reqCopy.WriteStream = ms.streamSettings.streamID + reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ + ProtoDescriptor: ms.schemaDescriptor, + } + reqCopy.TraceId = ms.streamSettings.TracePrefix + req = &reqCopy + }) + + var err error + if req == nil { + err = (*arc).Send(pw.request) + } else { + // we had to amend the initial request + err = (*arc).Send(req) + } if err != nil { bo, shouldRetry := r.Retry(err) if shouldRetry { @@ -240,38 +261,23 @@ func (ms *ManagedStream) call(f func(storagepb.BigQueryWrite_AppendRowsClient, c ms.err = err ms.mu.Unlock() } - return err - } -} - -func (ms *ManagedStream) append(pw *pendingWrite) error { - return ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { - // TODO: we should only send stream ID and schema for the first message in a new stream, but - // we've decoupled the individual appends from caring about state of the connection. We likely - // need to do this via a mutex-guarded state flag. - pw.request.WriteStream = ms.streamSettings.streamID - pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ - ProtoDescriptor: ms.schemaDescriptor, - } - err := arc.Send(pw.request) if err == nil { ch <- pw } return err - }) + } } func (ms *ManagedStream) Close() error { - err := ms.call(func(arc storagepb.BigQueryWrite_AppendRowsClient, ch chan *pendingWrite) error { - err := arc.CloseSend() - if err == nil { - close(ch) - } - return err - }) ms.mu.Lock() + if ms.arc == nil { + return fmt.Errorf("no stream exists") + } + err := (*ms.arc).CloseSend() + if err == nil { + close(ms.pending) + } ms.err = io.EOF - close(ms.pending) ms.mu.Unlock() return err } diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index c543b7617b8..ba9f12eb5aa 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -21,6 +21,8 @@ import ( storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" ) func TestManagedStream_OpenWithRetry(t *testing.T) { @@ -83,11 +85,99 @@ func TestManagedStream_OpenWithRetry(t *testing.T) { } } -func TestManagedStream_GetStream(t *testing.T) { +func TestManagedStream_FirstAppendBehavior(t *testing.T) { + var testARC *testAppendRowsClient + testARC = &testAppendRowsClient{ + recvF: func() (*storagepb.AppendRowsResponse, error) { + return &storagepb.AppendRowsResponse{ + Response: &storagepb.AppendRowsResponse_AppendResult_{}, + }, nil + }, + sendF: func(req *storagepb.AppendRowsRequest) error { + testARC.requests = append(testARC.requests, req) + return nil + }, + } + schema := &descriptorpb.DescriptorProto{ + Name: proto.String("testDescriptor"), + } + + ms := &ManagedStream{ + ctx: context.Background(), + open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) { + testARC.openCount = testARC.openCount + 1 + return testARC, nil + }, + streamSettings: defaultStreamSettings(), + } + ms.streamSettings.streamID = "FOO" + ms.streamSettings.TracePrefix = "TRACE" + ms.schemaDescriptor = schema + + fakeData := [][]byte{ + []byte("foo"), + []byte("bar"), + } + + wantReqs := 3 + + for i := 0; i < wantReqs; i++ { + _, err := ms.AppendRows(fakeData, NoStreamOffset) + if err != nil { + t.Errorf("AppendRows; %v", err) + } + } + + if testARC.openCount != 1 { + t.Errorf("expected a single open, got %d", testARC.openCount) + } + + if len(testARC.requests) != wantReqs { + t.Errorf("expected %d requests, got %d", wantReqs, len(testARC.requests)) + } + + for k, v := range testARC.requests { + if v == nil { + t.Errorf("request %d was nil", k) + } + if k == 0 { + if v.GetTraceId() == "" { + t.Errorf("expected TraceId on first request, was empty") + } + if v.GetWriteStream() == "" { + t.Errorf("expected WriteStream on first request, was empty") + } + if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() == nil { + t.Errorf("expected WriterSchema on first request, was empty") + } + + } else { + if v.GetTraceId() != "" { + t.Errorf("expected no TraceID on request %d, got %s", k, v.GetTraceId()) + } + if v.GetWriteStream() != "" { + t.Errorf("expected no WriteStream on request %d, got %s", k, v.GetWriteStream()) + } + if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() != nil { + t.Errorf("expected test WriterSchema on request %d, got %s", k, v.GetProtoRows().GetWriterSchema().GetProtoDescriptor().String()) + } + } + } } type testAppendRowsClient struct { storagepb.BigQueryWrite_AppendRowsClient - err error + openCount int + requests []*storagepb.AppendRowsRequest + sendF func(*storagepb.AppendRowsRequest) error + recvF func() (*storagepb.AppendRowsResponse, error) +} + +func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error { + return tarc.sendF(req) +} + +func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error) { + return tarc.recvF() } From 4f39f0bda594ec0f6442f8a542f366e020089246 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 23 Jul 2021 20:26:18 +0000 Subject: [PATCH 11/14] more docs --- bigquery/storage/managedwriter/client.go | 2 ++ .../storage/managedwriter/managed_stream.go | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index d3a37fdc8da..d8f60ed933b 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -67,6 +67,8 @@ func (c *Client) Close() error { } // NewManagedStream establishes a new managed stream for appending data into a table. +// +// Context here is retained for use by the underlying streaming connections the managed stream may create. func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) { return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 2fa13e0e250..9b19fad74a5 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -164,6 +164,8 @@ func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) { } // getStream returns either a valid ARC client stream or permanent error. +// +// Calling getStream locks the mutex. func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { ms.mu.Lock() defer ms.mu.Unlock() @@ -186,7 +188,8 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient } // openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. -// The expectation is this is only ever invoked when the caller has the mutex lock. +// +// Only getStream() should call this, and thus the calling code has the mutex lock. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { r := defaultRetryer{} for { @@ -203,7 +206,8 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie // and fire up the associated receive processor. ch := make(chan *pendingWrite) go recvProcessor(ms.ctx, arc, ch) - // also, replace sync.Once for starting a new stream, as we need to do this for every new connection + // Also, replace the sync.Once for setting up a new stream, as we need to do "special" work + // for every new connection. ms.streamSetup = new(sync.Once) return arc, ch, nil } @@ -269,14 +273,21 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error } func (ms *ManagedStream) Close() error { - ms.mu.Lock() + + var arc *storagepb.BigQueryWrite_AppendRowsClient + + arc, ch, err := ms.getStream(arc) + if err != nil { + return err + } if ms.arc == nil { return fmt.Errorf("no stream exists") } - err := (*ms.arc).CloseSend() + err = (*arc).CloseSend() if err == nil { - close(ms.pending) + close(ch) } + ms.mu.Lock() ms.err = io.EOF ms.mu.Unlock() return err @@ -294,6 +305,9 @@ func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResul } // recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine. +// +// The receive processor only deals with a single instance of a connection/channel, and thus should never interact +// with the mutex lock. func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { // TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply // ensure that pending writes get acknowledged with a terminal state. From fd0e687ad11d7a46a078be5e8a12cbbee3798659 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 26 Jul 2021 18:06:56 +0000 Subject: [PATCH 12/14] remove logging, recompile test proto w/docs --- bigquery/storage/managedwriter/managed_stream.go | 2 -- .../storage/managedwriter/testdata/messages.pb.go | 14 ++++++++------ .../storage/managedwriter/testdata/messages.proto | 7 +++---- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 9b19fad74a5..7cb6308982a 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "log" "sync" "github.com/googleapis/gax-go/v2" @@ -236,7 +235,6 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error } var req *storagepb.AppendRowsRequest ms.streamSetup.Do(func() { - log.Println("firing streamSetup") reqCopy := *pw.request reqCopy.WriteStream = ms.streamSettings.streamID reqCopy.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ diff --git a/bigquery/storage/managedwriter/testdata/messages.pb.go b/bigquery/storage/managedwriter/testdata/messages.pb.go index 8fc86cb3442..702b9c84e85 100644 --- a/bigquery/storage/managedwriter/testdata/messages.pb.go +++ b/bigquery/storage/managedwriter/testdata/messages.pb.go @@ -45,8 +45,10 @@ type SimpleMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Value int64 `protobuf:"varint,2,opt,name=value,proto3" json:"value,omitempty"` + // name is a simple scalar string. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // value is a simple int64 value. + Value int64 `protobuf:"varint,2,opt,name=value,proto3" json:"value,omitempty"` } func (x *SimpleMessage) Reset() { @@ -103,11 +105,11 @@ var file_messages_proto_rawDesc = []byte{ 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x3d, 0x5a, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x67, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x61, 0x70, - 0x69, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x32, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6d, 0x61, + 0x6e, 0x61, 0x67, 0x65, 0x64, 0x77, 0x72, 0x69, 0x74, 0x65, 0x72, 0x2f, 0x74, 0x65, 0x73, 0x74, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/bigquery/storage/managedwriter/testdata/messages.proto b/bigquery/storage/managedwriter/testdata/messages.proto index 3112ed93f92..9b77e539ef5 100644 --- a/bigquery/storage/managedwriter/testdata/messages.proto +++ b/bigquery/storage/managedwriter/testdata/messages.proto @@ -14,16 +14,15 @@ syntax = "proto3"; package testdata; -option go_package = "cloud.google.com/go/bigquery/storage/apiv1beta2/testdata"; +option go_package = "cloud.google.com/go/bigquery/storage/managedwriter/testdata"; // SimpleMessage represents a simple message that transmits a string and int64 value. message SimpleMessage { + // name is a simple scalar string. string name = 1; + // value is a simple int64 value. int64 value = 2; } -// TODO: use wrappers.proto to define a nullable form of the message. - - From eef2c77a380ea033d6656525498fef46df58f681 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 26 Jul 2021 18:36:39 +0000 Subject: [PATCH 13/14] reformat test proto --- bigquery/storage/managedwriter/testdata/messages.pb.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/testdata/messages.pb.go b/bigquery/storage/managedwriter/testdata/messages.pb.go index 702b9c84e85..162b0d9c537 100644 --- a/bigquery/storage/managedwriter/testdata/messages.pb.go +++ b/bigquery/storage/managedwriter/testdata/messages.pb.go @@ -21,11 +21,12 @@ package testdata import ( + reflect "reflect" + sync "sync" + proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" ) const ( From e7b85d194eb27c5e4572c340b82b1f475718294d Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 26 Jul 2021 19:03:00 +0000 Subject: [PATCH 14/14] address docstring issues --- bigquery/storage/managedwriter/managed_stream.go | 1 + bigquery/storage/managedwriter/writer_option.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 7cb6308982a..296e8b491fd 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -270,6 +270,7 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error } } +// Close closes a managed stream. func (ms *ManagedStream) Close() error { var arc *storagepb.BigQueryWrite_AppendRowsClient diff --git a/bigquery/storage/managedwriter/writer_option.go b/bigquery/storage/managedwriter/writer_option.go index 0670cbe0973..180418c5ff1 100644 --- a/bigquery/storage/managedwriter/writer_option.go +++ b/bigquery/storage/managedwriter/writer_option.go @@ -70,7 +70,7 @@ func WithTracePrefix(prefix string) WriterOption { } } -// WithDescriptor describes the format of messages you'll be sending to the service. +// WithSchemaDescriptor describes the format of messages you'll be sending to the service. func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption { return func(ms *ManagedStream) { ms.schemaDescriptor = dp