From 88564cc9221ea068c0451df5f666ea76fe083122 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 26 Jul 2021 22:56:12 +0000 Subject: [PATCH 1/2] fix(bigquery/storage/managedwriter): fix double-close error, add tests Writing tests picked up the error, so hooray. --- .../storage/managedwriter/integration_test.go | 161 ++++++++++++++++++ .../storage/managedwriter/managed_stream.go | 5 +- 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 843d4f90db3..72810f70229 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -17,6 +17,7 @@ package managedwriter import ( "context" "fmt" + "log" "testing" "time" @@ -26,10 +27,12 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" "google.golang.org/api/option" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" ) var ( @@ -205,3 +208,161 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { wantRows = wantRows * 2 validateRowCount(ctx, t, bqClient, testTable, wantRows) } + +func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + md, descriptorProto := setupDynamicDescriptors(t, schema) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + sampleData := [][]byte{ + []byte(`{"name": "one", "value": 1}`), + []byte(`{"name": "two", "value": 2}`), + []byte(`{"name": "three", "value": 3}`), + []byte(`{"name": "four", "value": 4}`), + []byte(`{"name": "five", "value": 5}`), + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + // First, append rows individually. + var results []*AppendResult + for k, v := range sampleData { + message := dynamicpb.NewMessage(md) + + // First, json->proto message + err = protojson.Unmarshal(v, message) + if err != nil { + t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) + } + // Then, proto message -> bytes. + b, err := proto.Marshal(message) + if err != nil { + t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) + } + results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(sampleData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows) +} + +func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(BufferedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) + if err != nil { + t.Errorf("couldn't get stream info: %v", err) + } + log.Printf("info id: %s type: %s", info.GetName(), info.GetType().String()) + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually, validate, then advance. + var expectedRows int64 + for k, mesg := range testData { + log.Printf("starting %d", k) + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err := ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + // wait for ack + offset, err := results[0].GetResult(ctx) + if err != nil { + t.Errorf("got error from pending result %d: %v", k, err) + } + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + // move offset and re-validate. + flushOffset, err := ms.FlushRows(ctx, offset) + if err != nil { + t.Errorf("failed to flush offset to %d: %v", offset, err) + } + expectedRows = flushOffset + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index e1a526a6051..d2df45613b6 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "log" "sync" "github.com/googleapis/gax-go/v2" @@ -139,6 +140,7 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er Value: offset, }, } + log.Printf("flush req: %s", req.String()) resp, err := ms.c.rawClient.FlushRows(ctx, req) if err != nil { return 0, err @@ -349,8 +351,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl off := success.GetOffset() if off != nil { nextWrite.markDone(off.GetValue(), nil, fc) + } else { + nextWrite.markDone(NoStreamOffset, nil, fc) } - nextWrite.markDone(NoStreamOffset, nil, fc) } } } From 58e4bd4d507c99b846f8602edff01559463190a5 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 27 Jul 2021 23:23:25 +0000 Subject: [PATCH 2/2] update tests, strip logging from debug session --- bigquery/storage/managedwriter/integration_test.go | 10 +++++----- bigquery/storage/managedwriter/managed_stream.go | 2 -- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 72810f70229..882a7e9df2a 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -17,7 +17,6 @@ package managedwriter import ( "context" "fmt" - "log" "testing" "time" @@ -123,7 +122,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } -func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { +func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() defer bqClient.Close() @@ -325,7 +324,9 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { if err != nil { t.Errorf("couldn't get stream info: %v", err) } - log.Printf("info id: %s type: %s", info.GetName(), info.GetType().String()) + if info.GetType().String() != string(ms.StreamType()) { + t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) + } // prevalidate we have no data in table. validateRowCount(ctx, t, bqClient, testTable, 0) @@ -341,7 +342,6 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { // First, send the test rows individually, validate, then advance. var expectedRows int64 for k, mesg := range testData { - log.Printf("starting %d", k) b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) @@ -362,7 +362,7 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { if err != nil { t.Errorf("failed to flush offset to %d: %v", offset, err) } - expectedRows = flushOffset + expectedRows = flushOffset + 1 validateRowCount(ctx, t, bqClient, testTable, expectedRows) } } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index d2df45613b6..be88be9a870 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "log" "sync" "github.com/googleapis/gax-go/v2" @@ -140,7 +139,6 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er Value: offset, }, } - log.Printf("flush req: %s", req.String()) resp, err := ms.c.rawClient.FlushRows(ctx, req) if err != nil { return 0, err