From d8ec92b14c024af6342244e772cc150a83149602 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 28 Jul 2021 13:04:08 -0700 Subject: [PATCH] test(bigquery/storage/managedwriter): test all stream types (#4507) * test(bigquery/storage/managedwriter): test all stream types This PR adds testing to support all stream types, and refactors the integration test to support parallel testing and reduce some of the boilerplate code. --- .../storage/managedwriter/integration_test.go | 237 ++++++++++++------ 1 file changed, 164 insertions(+), 73 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 882a7e9df2a..95289688ea4 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -40,6 +40,11 @@ var ( defaultTestTimeout = 15 * time.Second ) +var testSimpleSchema = bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, +} + func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { if testing.Short() { t.Skip("Integration tests skipped in short mode") @@ -66,28 +71,28 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti } // validateRowCount confirms the number of rows in a table visible to the query engine. -func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) { +func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64, description string) { // Verify data is present in the table with a count query. sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID) q := client.Query(sql) it, err := q.Read(ctx) if err != nil { - t.Errorf("failed to issue validation query: %v", err) + t.Errorf("failed to issue validation query %q: %v", description, err) return } var rowdata []bigquery.Value err = it.Next(&rowdata) if err != nil { - t.Errorf("error fetching validation results: %v", err) + t.Errorf("error fetching validation results %q: %v", description, err) return } count, ok := rowdata[0].(int64) if !ok { - t.Errorf("got unexpected data from validation query: %v", rowdata[0]) + t.Errorf("got unexpected data from validation query %q: %v", description, rowdata[0]) } if count != expectedRows { - t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows) + t.Errorf("rows mismatch from %q, expected rows: got %d want %d", description, count, expectedRows) } } @@ -99,7 +104,7 @@ func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) ( } return dataset, func() { if err := dataset.DeleteWithContents(ctx); err != nil { - t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err) + t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err) } }, nil } @@ -122,7 +127,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } -func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { +func TestIntegration_ManagedWriter(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() defer bqClient.Close() @@ -136,14 +141,36 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + t.Run("group", func(t *testing.T) { + t.Run("DefaultStream", func(t *testing.T) { + t.Parallel() + testDefaultStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("DefaultStream_DynamicJSON", func(t *testing.T) { + t.Parallel() + testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("CommittedStream", func(t *testing.T) { + t.Parallel() + testCommittedStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("BufferedStream", func(t *testing.T) { + t.Parallel() + testBufferedStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("PendingStream", func(t *testing.T) { + t.Parallel() + testPendingStream(ctx, t, mwClient, bqClient, dataset) + }) + }) +} + +func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { - t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) } // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation // to send as the stream's schema. @@ -159,9 +186,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { if err != nil { t.Fatalf("NewManagedStream: %v", err) } - - // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ {Name: "one", Value: 1}, @@ -187,7 +212,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { // wait for the result to indicate ready, then validate. results[0].Ready() wantRows := int64(len(testData)) - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send") // Now, send the test rows grouped into in a single append. var data [][]byte @@ -205,36 +230,17 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { // wait for the result to indicate ready, then validate again. results[0].Ready() wantRows = wantRows * 2 - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send") } -func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t) - defer mwClient.Close() - defer bqClient.Close() - - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) - if err != nil { - t.Fatalf("failed to init test dataset: %v", err) - } - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // prep a suitable destination table. +func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - md, descriptorProto := setupDynamicDescriptors(t, schema) + md, descriptorProto := setupDynamicDescriptors(t, testSimpleSchema) - // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(DefaultStream), @@ -243,6 +249,7 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { if err != nil { t.Fatalf("NewManagedStream: %v", err) } + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") sampleData := [][]byte{ []byte(`{"name": "one", "value": 1}`), @@ -252,10 +259,6 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { []byte(`{"name": "five", "value": 5}`), } - // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) - - // First, append rows individually. var results []*AppendResult for k, v := range sampleData { message := dynamicpb.NewMessage(md) @@ -279,38 +282,18 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { // wait for the result to indicate ready, then validate. results[0].Ready() wantRows := int64(len(sampleData)) - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") } -func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t) - defer mwClient.Close() - defer bqClient.Close() - - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) - if err != nil { - t.Fatalf("failed to init test dataset: %v", err) - } - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // prep a suitable destination table. +func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation - // to send as the stream's schema. + m := &testdata.SimpleMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) - // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(BufferedStream), @@ -328,8 +311,7 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) } - // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ {Name: "one", Value: 1}, @@ -339,7 +321,6 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { {Name: "five", Value: 2}, } - // First, send the test rows individually, validate, then advance. var expectedRows int64 for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -356,13 +337,123 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { if err != nil { t.Errorf("got error from pending result %d: %v", k, err) } - validateRowCount(ctx, t, bqClient, testTable, expectedRows) + validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("before flush %d", k)) // move offset and re-validate. flushOffset, err := ms.FlushRows(ctx, offset) if err != nil { t.Errorf("failed to flush offset to %d: %v", offset, err) } expectedRows = flushOffset + 1 - validateRowCount(ctx, t, bqClient, testTable, expectedRows) + validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("after flush %d", k)) + } +} + +func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + var results []*AppendResult + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(testData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") +} + +func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(PendingStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // Send data. + var results []*AppendResult + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + results[0].Ready() + wantRows := int64(len(testData)) + + // Mark stream complete. + trackedOffset, err := ms.Finalize(ctx) + if err != nil { + t.Errorf("Finalize: %v", err) + } + + if trackedOffset != wantRows { + t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) + } + + // Commit stream and validate. + resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) + if err != nil { + t.Errorf("client.BatchCommit: %v", err) + } + if len(resp.StreamErrors) > 0 { + t.Errorf("stream errors present: %v", resp.StreamErrors) } + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") }