From 4d8bca742364da259306b2adcc25e6a421ede0f1 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Jul 2021 17:15:11 +0000 Subject: [PATCH 1/4] test(bigquery/storage/managedwriter): test all stream types This PR adds testing to support all stream types, and refactors the integration test to support parallel testing and reduce some of the boilerplate code. --- .../storage/managedwriter/integration_test.go | 233 +++++++++++++----- 1 file changed, 172 insertions(+), 61 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 882a7e9df2a..ce8b4342036 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -40,6 +40,11 @@ var ( defaultTestTimeout = 15 * time.Second ) +var testSimpleSchema = bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, +} + func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { if testing.Short() { t.Skip("Integration tests skipped in short mode") @@ -66,28 +71,28 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti } // validateRowCount confirms the number of rows in a table visible to the query engine. -func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) { +func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64, description string) { // Verify data is present in the table with a count query. sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID) q := client.Query(sql) it, err := q.Read(ctx) if err != nil { - t.Errorf("failed to issue validation query: %v", err) + t.Errorf("failed to issue validation query %s: %v", description, err) return } var rowdata []bigquery.Value err = it.Next(&rowdata) if err != nil { - t.Errorf("error fetching validation results: %v", err) + t.Errorf("error fetching validation results %s: %v", description, err) return } count, ok := rowdata[0].(int64) if !ok { - t.Errorf("got unexpected data from validation query: %v", rowdata[0]) + t.Errorf("got unexpected data from validation query %s: %v", description, rowdata[0]) } if count != expectedRows { - t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows) + t.Errorf("rows mismatch from %s, expected rows: got %d want %d", description, count, expectedRows) } } @@ -122,7 +127,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } -func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { +func TestIntegration_ManagedWriter(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() defer bqClient.Close() @@ -136,13 +141,35 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + t.Run("group", func(t *testing.T) { + t.Run("DefaultStream", func(t *testing.T) { + t.Parallel() + testDefaultStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("DefaultStream_DynamicJSON", func(t *testing.T) { + t.Parallel() + testDefaultStream_DynamicJSON(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("CommittedStream", func(t *testing.T) { + t.Parallel() + testCommittedStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("BufferedStream", func(t *testing.T) { + t.Parallel() + testBufferedStream(ctx, t, mwClient, bqClient, dataset) + }) + t.Run("PendingStream", func(t *testing.T) { + t.Parallel() + testPendingStream(ctx, t, mwClient, bqClient, dataset) + }) + }) +} + +func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation @@ -161,7 +188,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { } // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ {Name: "one", Value: 1}, @@ -187,7 +214,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { // wait for the result to indicate ready, then validate. results[0].Ready() wantRows := int64(len(testData)) - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send") // Now, send the test rows grouped into in a single append. var data [][]byte @@ -205,34 +232,16 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { // wait for the result to indicate ready, then validate again. results[0].Ready() wantRows = wantRows * 2 - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send") } -func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t) - defer mwClient.Close() - defer bqClient.Close() - - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) - if err != nil { - t.Fatalf("failed to init test dataset: %v", err) - } - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // prep a suitable destination table. +func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - md, descriptorProto := setupDynamicDescriptors(t, schema) + md, descriptorProto := setupDynamicDescriptors(t, testSimpleSchema) // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, @@ -253,9 +262,8 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { } // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - // First, append rows individually. var results []*AppendResult for k, v := range sampleData { message := dynamicpb.NewMessage(md) @@ -279,30 +287,13 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { // wait for the result to indicate ready, then validate. results[0].Ready() wantRows := int64(len(sampleData)) - validateRowCount(ctx, t, bqClient, testTable, wantRows) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") } -func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { - mwClient, bqClient := getTestClients(context.Background(), t) - defer mwClient.Close() - defer bqClient.Close() - - dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) - if err != nil { - t.Fatalf("failed to init test dataset: %v", err) - } - defer cleanup() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // prep a suitable destination table. +func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) - schema := bigquery.Schema{ - {Name: "name", Type: bigquery.StringFieldType, Required: true}, - {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, - } - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation @@ -329,7 +320,7 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { } // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0) + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ {Name: "one", Value: 1}, @@ -356,13 +347,133 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { if err != nil { t.Errorf("got error from pending result %d: %v", k, err) } - validateRowCount(ctx, t, bqClient, testTable, expectedRows) + validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("before flush %d", k)) // move offset and re-validate. flushOffset, err := ms.FlushRows(ctx, offset) if err != nil { t.Errorf("failed to flush offset to %d: %v", offset, err) } expectedRows = flushOffset + 1 - validateRowCount(ctx, t, bqClient, testTable, expectedRows) + validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("after flush %d", k)) + } +} + +func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(CommittedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually. + var results []*AppendResult + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(testData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") +} + +func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(PendingStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually. + var results []*AppendResult + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err = ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + results[0].Ready() + wantRows := int64(len(testData)) + + trackedOffset, err := ms.Finalize(ctx) + if err != nil { + t.Errorf("Finalize: %v", err) + } + + if trackedOffset != wantRows { + t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) + } + + resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) + if err != nil { + t.Errorf("client.BatchCommit: %v", err) + } + if len(resp.StreamErrors) > 0 { + t.Errorf("stream errors present: %v", resp.StreamErrors) } + validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send") } From c2327064437fbb104871bdd89971523b8d13de69 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Jul 2021 17:23:50 +0000 Subject: [PATCH 2/4] comment cleanup --- .../storage/managedwriter/integration_test.go | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index ce8b4342036..71daf34d82c 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -186,8 +186,6 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl if err != nil { t.Fatalf("NewManagedStream: %v", err) } - - // prevalidate we have no data in table. validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ @@ -243,7 +241,6 @@ func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient * md, descriptorProto := setupDynamicDescriptors(t, testSimpleSchema) - // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(DefaultStream), @@ -252,6 +249,7 @@ func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient * if err != nil { t.Fatalf("NewManagedStream: %v", err) } + validateRowCount(ctx, t, bqClient, testTable, 0, "before send") sampleData := [][]byte{ []byte(`{"name": "one", "value": 1}`), @@ -261,9 +259,6 @@ func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient * []byte(`{"name": "five", "value": 5}`), } - // prevalidate we have no data in table. - validateRowCount(ctx, t, bqClient, testTable, 0, "before send") - var results []*AppendResult for k, v := range sampleData { message := dynamicpb.NewMessage(md) @@ -292,16 +287,13 @@ func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient * func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) - if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation - // to send as the stream's schema. + m := &testdata.SimpleMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) - // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(BufferedStream), @@ -319,7 +311,6 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) } - // prevalidate we have no data in table. validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ @@ -330,7 +321,6 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC {Name: "five", Value: 2}, } - // First, send the test rows individually, validate, then advance. var expectedRows int64 for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -359,14 +349,11 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC } func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { - - // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation - // to send as the stream's schema. + m := &testdata.SimpleMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) @@ -379,8 +366,6 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq if err != nil { t.Fatalf("NewManagedStream: %v", err) } - - // prevalidate we have no data in table. validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ @@ -391,7 +376,6 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq {Name: "five", Value: 2}, } - // First, send the test rows individually. var results []*AppendResult for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -411,18 +395,14 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq } func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { - - // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) } - // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation - // to send as the stream's schema. + m := &testdata.SimpleMessage{} descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) - // setup a new stream. ms, err := mwClient.NewManagedStream(ctx, WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), WithType(PendingStream), @@ -431,8 +411,6 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl if err != nil { t.Fatalf("NewManagedStream: %v", err) } - - // prevalidate we have no data in table. validateRowCount(ctx, t, bqClient, testTable, 0, "before send") testData := []*testdata.SimpleMessage{ @@ -443,7 +421,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl {Name: "five", Value: 2}, } - // First, send the test rows individually. + // Send data. var results []*AppendResult for k, mesg := range testData { b, err := proto.Marshal(mesg) @@ -459,6 +437,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl results[0].Ready() wantRows := int64(len(testData)) + // Mark stream complete. trackedOffset, err := ms.Finalize(ctx) if err != nil { t.Errorf("Finalize: %v", err) @@ -468,6 +447,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) } + // Commit stream and validate. resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()}) if err != nil { t.Errorf("client.BatchCommit: %v", err) From 726b733785eaef6411e4c2ba7cd190be4145f0ab Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Jul 2021 19:26:29 +0000 Subject: [PATCH 3/4] address lint warning --- bigquery/storage/managedwriter/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 71daf34d82c..8c9a189a189 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -148,7 +148,7 @@ func TestIntegration_ManagedWriter(t *testing.T) { }) t.Run("DefaultStream_DynamicJSON", func(t *testing.T) { t.Parallel() - testDefaultStream_DynamicJSON(ctx, t, mwClient, bqClient, dataset) + testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset) }) t.Run("CommittedStream", func(t *testing.T) { t.Parallel() @@ -233,7 +233,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send") } -func testDefaultStream_DynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { +func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) From 898cd9a3b18d2a1b6499e5d8355775dda64b69a9 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Wed, 28 Jul 2021 19:35:18 +0000 Subject: [PATCH 4/4] address reviewer comments --- bigquery/storage/managedwriter/integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 8c9a189a189..95289688ea4 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -78,21 +78,21 @@ func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client q := client.Query(sql) it, err := q.Read(ctx) if err != nil { - t.Errorf("failed to issue validation query %s: %v", description, err) + t.Errorf("failed to issue validation query %q: %v", description, err) return } var rowdata []bigquery.Value err = it.Next(&rowdata) if err != nil { - t.Errorf("error fetching validation results %s: %v", description, err) + t.Errorf("error fetching validation results %q: %v", description, err) return } count, ok := rowdata[0].(int64) if !ok { - t.Errorf("got unexpected data from validation query %s: %v", description, rowdata[0]) + t.Errorf("got unexpected data from validation query %q: %v", description, rowdata[0]) } if count != expectedRows { - t.Errorf("rows mismatch from %s, expected rows: got %d want %d", description, count, expectedRows) + t.Errorf("rows mismatch from %q, expected rows: got %d want %d", description, count, expectedRows) } } @@ -104,7 +104,7 @@ func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) ( } return dataset, func() { if err := dataset.DeleteWithContents(ctx); err != nil { - t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err) + t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err) } }, nil } @@ -170,7 +170,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl // prep a suitable destination table. testTable := dataset.Table(tableIDs.New()) if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil { - t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) } // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation // to send as the stream's schema.