diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 843d4f90db3..882a7e9df2a 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -26,10 +26,12 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" "google.golang.org/api/option" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protodesc" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" ) var ( @@ -120,7 +122,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor) } -func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { +func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) { mwClient, bqClient := getTestClients(context.Background(), t) defer mwClient.Close() defer bqClient.Close() @@ -205,3 +207,162 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) { wantRows = wantRows * 2 validateRowCount(ctx, t, bqClient, testTable, wantRows) } + +func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + + md, descriptorProto := setupDynamicDescriptors(t, schema) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(DefaultStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + sampleData := [][]byte{ + []byte(`{"name": "one", "value": 1}`), + []byte(`{"name": "two", "value": 2}`), + []byte(`{"name": "three", "value": 3}`), + []byte(`{"name": "four", "value": 4}`), + []byte(`{"name": "five", "value": 5}`), + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + // First, append rows individually. + var results []*AppendResult + for k, v := range sampleData { + message := dynamicpb.NewMessage(md) + + // First, json->proto message + err = protojson.Unmarshal(v, message) + if err != nil { + t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) + } + // Then, proto message -> bytes. + b, err := proto.Marshal(message) + if err != nil { + t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) + } + results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + } + + // wait for the result to indicate ready, then validate. + results[0].Ready() + wantRows := int64(len(sampleData)) + validateRowCount(ctx, t, bqClient, testTable, wantRows) +} + +func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) { + mwClient, bqClient := getTestClients(context.Background(), t) + defer mwClient.Close() + defer bqClient.Close() + + dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient) + if err != nil { + t.Fatalf("failed to init test dataset: %v", err) + } + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // prep a suitable destination table. + testTable := dataset.Table(tableIDs.New()) + schema := bigquery.Schema{ + {Name: "name", Type: bigquery.StringFieldType, Required: true}, + {Name: "value", Type: bigquery.IntegerFieldType, Required: true}, + } + if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) + } + // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation + // to send as the stream's schema. + m := &testdata.SimpleMessage{} + descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) + + // setup a new stream. + ms, err := mwClient.NewManagedStream(ctx, + WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)), + WithType(BufferedStream), + WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } + + info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID) + if err != nil { + t.Errorf("couldn't get stream info: %v", err) + } + if info.GetType().String() != string(ms.StreamType()) { + t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) + } + + // prevalidate we have no data in table. + validateRowCount(ctx, t, bqClient, testTable, 0) + + testData := []*testdata.SimpleMessage{ + {Name: "one", Value: 1}, + {Name: "two", Value: 2}, + {Name: "three", Value: 3}, + {Name: "four", Value: 1}, + {Name: "five", Value: 2}, + } + + // First, send the test rows individually, validate, then advance. + var expectedRows int64 + for k, mesg := range testData { + b, err := proto.Marshal(mesg) + if err != nil { + t.Errorf("failed to marshal message %d: %v", k, err) + } + data := [][]byte{b} + results, err := ms.AppendRows(ctx, data, NoStreamOffset) + if err != nil { + t.Errorf("single-row append %d failed: %v", k, err) + } + // wait for ack + offset, err := results[0].GetResult(ctx) + if err != nil { + t.Errorf("got error from pending result %d: %v", k, err) + } + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + // move offset and re-validate. + flushOffset, err := ms.FlushRows(ctx, offset) + if err != nil { + t.Errorf("failed to flush offset to %d: %v", offset, err) + } + expectedRows = flushOffset + 1 + validateRowCount(ctx, t, bqClient, testTable, expectedRows) + } +} diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index e1a526a6051..be88be9a870 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -349,8 +349,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl off := success.GetOffset() if off != nil { nextWrite.markDone(off.GetValue(), nil, fc) + } else { + nextWrite.markDone(NoStreamOffset, nil, fc) } - nextWrite.markDone(NoStreamOffset, nil, fc) } } }