From c6cf6590a41368885b7399c993c47dc965862558 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 28 Jul 2021 09:24:49 -0700 Subject: [PATCH] fix(bigquery/storage/managedwriter): fix double-close error, add tests (#4502) Writing tests picked up the error, so hooray. BufferedStream integration test exposed that while the API surface is in preview without special requirements, advanced features such as the FlushRows rpc used by BufferedStream does. This has been whitelisted for test projects, but we'll want to add this to doc.go when I start that PR. Towards https://github.com/googleapis/google-cloud-go/issues/4366 --- .../storage/managedwriter/integration_test.go | 163 +++++++++++++++++- .../storage/managedwriter/managed_stream.go | 3 +- 2 files changed, 164 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 843d4f90db3..882a7e9df2a 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -26,10 +26,12 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" "google.golang.org/api/option" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" ) var ( @@ -120,7 +122,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } -func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { +func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() defer bqClient.Close() @@ -205,3 +207,162 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { wantRows = wantRows * 2 validateRowCount(ctx, t, bqClient, testTable, wantRows) } + +func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + md, descriptorProto := setupDynamicDescriptors(t, schema) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + sampleData := [][]byte{ + []byte(`{"name": "one", "value": 1}`), + []byte(`{"name": "two", "value": 2}`), + []byte(`{"name": "three", "value": 3}`), + []byte(`{"name": "four", "value": 4}`), + []byte(`{"name": "five", "value": 5}`), + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + // First, append rows individually. + var results []*AppendResult + for k, v := range sampleData { + message := dynamicpb.NewMessage(md) + + // First, json->proto message + err = protojson.Unmarshal(v, message) + if err != nil { + t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) + } + // Then, proto message -> bytes. + b, err := proto.Marshal(message) + if err != nil { + t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) + } + results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(sampleData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows) +} + +func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(BufferedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) + if err != nil { + t.Errorf("couldn't get stream info: %v", err) + } + if info.GetType().String() != string(ms.StreamType()) { + t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually, validate, then advance. + var expectedRows int64 + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err := ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + // wait for ack + offset, err := results[0].GetResult(ctx) + if err != nil { + t.Errorf("got error from pending result %d: %v", k, err) + } + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + // move offset and re-validate. + flushOffset, err := ms.FlushRows(ctx, offset) + if err != nil { + t.Errorf("failed to flush offset to %d: %v", offset, err) + } + expectedRows = flushOffset + 1 + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index e1a526a6051..be88be9a870 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -349,8 +349,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl off := success.GetOffset() if off != nil { nextWrite.markDone(off.GetValue(), nil, fc) + } else { + nextWrite.markDone(NoStreamOffset, nil, fc) } - nextWrite.markDone(NoStreamOffset, nil, fc) } } }