Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(bigquery/storage/managedwriter): test all stream types #4507

Merged
merged 4 commits into from Jul 28, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
233 changes: 162 additions & 71 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -40,6 +40,11 @@ var (
defaultTestTimeout = 15 * time.Second
)

var testSimpleSchema = bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}

func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
Expand All @@ -66,28 +71,28 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti
}

// validateRowCount confirms the number of rows in a table visible to the query engine.
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) {
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64, description string) {

// Verify data is present in the table with a count query.
sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID)
q := client.Query(sql)
it, err := q.Read(ctx)
if err != nil {
t.Errorf("failed to issue validation query: %v", err)
t.Errorf("failed to issue validation query %s: %v", description, err)
shollyman marked this conversation as resolved.
Show resolved Hide resolved
return
}
var rowdata []bigquery.Value
err = it.Next(&rowdata)
if err != nil {
t.Errorf("error fetching validation results: %v", err)
t.Errorf("error fetching validation results %s: %v", description, err)
return
}
count, ok := rowdata[0].(int64)
if !ok {
t.Errorf("got unexpected data from validation query: %v", rowdata[0])
t.Errorf("got unexpected data from validation query %s: %v", description, rowdata[0])
}
if count != expectedRows {
t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows)
t.Errorf("rows mismatch from %s, expected rows: got %d want %d", description, count, expectedRows)
}
}

Expand Down Expand Up @@ -122,7 +127,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
}

func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
func TestIntegration_ManagedWriter(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()
Expand All @@ -136,13 +141,35 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

t.Run("group", func(t *testing.T) {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
t.Run("DefaultStream", func(t *testing.T) {
t.Parallel()
testDefaultStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultStream_DynamicJSON", func(t *testing.T) {
t.Parallel()
testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
})
t.Run("CommittedStream", func(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("PendingStream", func(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
})
}

func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {

// prep a suitable destination table.
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
Expand All @@ -159,9 +186,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
Expand All @@ -187,7 +212,7 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send")

// Now, send the test rows grouped into in a single append.
var data [][]byte
Expand All @@ -205,36 +230,17 @@ func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
// wait for the result to indicate ready, then validate again.
results[0].Ready()
wantRows = wantRows * 2
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after second send")
}

func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, schema)
md, descriptorProto := setupDynamicDescriptors(t, testSimpleSchema)

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
Expand All @@ -243,6 +249,7 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

sampleData := [][]byte{
[]byte(`{"name": "one", "value": 1}`),
Expand All @@ -252,10 +259,6 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
[]byte(`{"name": "five", "value": 5}`),
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)

// First, append rows individually.
var results []*AppendResult
for k, v := range sampleData {
message := dynamicpb.NewMessage(md)
Expand All @@ -279,38 +282,18 @@ func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(sampleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}

func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(BufferedStream),
Expand All @@ -328,8 +311,7 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
Expand All @@ -339,7 +321,6 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
{Name: "five", Value: 2},
}

// First, send the test rows individually, validate, then advance.
var expectedRows int64
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
Expand All @@ -356,13 +337,123 @@ func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("before flush %d", k))
// move offset and re-validate.
flushOffset, err := ms.FlushRows(ctx, offset)
if err != nil {
t.Errorf("failed to flush offset to %d: %v", offset, err)
}
expectedRows = flushOffset + 1
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
validateRowCount(ctx, t, bqClient, testTable, expectedRows, fmt.Sprintf("after flush %d", k))
}
}

func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

var results []*AppendResult
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(testData))
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(PendingStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

// Send data.
var results []*AppendResult
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
results[0].Ready()
wantRows := int64(len(testData))

// Mark stream complete.
trackedOffset, err := ms.Finalize(ctx)
if err != nil {
t.Errorf("Finalize: %v", err)
}

if trackedOffset != wantRows {
t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
}

// Commit stream and validate.
resp, err := mwClient.BatchCommit(ctx, tableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
if err != nil {
t.Errorf("client.BatchCommit: %v", err)
}
if len(resp.StreamErrors) > 0 {
t.Errorf("stream errors present: %v", resp.StreamErrors)
}
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
}