Skip to content

Commit

Permalink
test(bigquery/storage/managedwriter): test all stream types (#4507)
Browse files Browse the repository at this point in the history
* test(bigquery/storage/managedwriter): test all stream types

This PR adds testing to support all stream types, and refactors the
integration test to support parallel testing and reduce some of the
boilerplate code.
  • Loading branch information
shollyman committed Jul 28, 2021
1 parent c6cf659 commit d8ec92b
Showing 1 changed file with 164 additions and 73 deletions.
237 changes: 164 additions & 73 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -40,6 +40,11 @@ var (
defaultTestTimeout = 15 * time.Second
)

var testSimpleSchema = bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}

func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
Expand All @@ -66,28 +71,28 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti
}

// validateRowCount confirms the number of rows in a table visible to the query engine.
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) {
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64, description string) {

// Verify data is present in the table with a count query.
sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Errorf("failed to issue validation query: %v", err)
t.Errorf("failed to issue validation query %q: %v", description, err)
return
}
var rowdata []bigquery.Value
err = it.Next(&rowdata)
if err != nil {
t.Errorf("error fetching validation results: %v", err)
t.Errorf("error fetching validation results %q: %v", description, err)
return
}
count, ok := rowdata[0].(int64)
if !ok {
t.Errorf("got unexpected data from validation query: %v", rowdata[0])
t.Errorf("got unexpected data from validation query %q: %v", description, rowdata[0])
}
if count != expectedRows {
t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows)
t.Errorf("rows mismatch from %q, expected rows: got %d want %d", description, count, expectedRows)
}
}

Expand All @@ -99,7 +104,7 @@ func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (
}
return dataset, func() {
if err := dataset.DeleteWithContents(ctx); err != nil {
t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err)
t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
}
}, nil
}
Expand All @@ -122,7 +127,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
}

func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
func TestIntegration_ManagedWriter(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()
Expand All @@ -136,14 +141,36 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

t.Run("group", func(t *testing.T) {
t.Run("DefaultStream", func(t *testing.T) {
t.Parallel()
testDefaultStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultStream_DynamicJSON", func(t *testing.T) {
t.Parallel()
testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
})
t.Run("CommittedStream", func(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("PendingStream", func(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
})
}

func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {

// prep a suitable destination table.
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.
Expand All @@ -159,9 +186,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
Expand All @@ -187,7 +212,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send")

// Now, send the test rows grouped into in a single append.
var data [][]byte
Expand All @@ -205,36 +230,17 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
// wait for the result to indicate ready, then validate again.
results[0].Ready()
wantRows = wantRows * 2
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send")
}

func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, schema)
md, descriptorProto := setupDynamicDescriptors(t, testSimpleSchema)

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
Expand All @@ -243,6 +249,7 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

sampleData := [][]byte{
[]byte(`{"name": "one", "value": 1}`),
Expand All @@ -252,10 +259,6 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
[]byte(`{"name": "five", "value": 5}`),
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)

// First, append rows individually.
var results []*AppendResult
for k, v := range sampleData {
message := dynamicpb.NewMessage(md)
Expand All @@ -279,38 +282,18 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(sampleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}

func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(BufferedStream),
Expand All @@ -328,8 +311,7 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
Expand All @@ -339,7 +321,6 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
{Name: "five", Value: 2},
}

// First, send the test rows individually, validate, then advance.
var expectedRows int64
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
Expand All @@ -356,13 +337,123 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("before flush %d", k))
// move offset and re-validate.
flushOffset, err := ms.FlushRows(ctx, offset)
if err != nil {
t.Errorf("failed to flush offset to %d: %v", offset, err)
}
expectedRows = flushOffset + 1
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("after flush %d", k))
}
}

func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

var results []*AppendResult
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testData))
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(PendingStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

// Send data.
var results []*AppendResult
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
results[0].Ready()
wantRows := int64(len(testData))

// Mark stream complete.
trackedOffset, err := ms.Finalize(ctx)
if err != nil {
t.Errorf("Finalize: %v", err)
}

if trackedOffset != wantRows {
t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
}

// Commit stream and validate.
resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
if err != nil {
t.Errorf("client.BatchCommit: %v", err)
}
if len(resp.StreamErrors) > 0 {
t.Errorf("stream errors present: %v", resp.StreamErrors)
}
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}

0 comments on commit d8ec92b

Please sign in to comment.