Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter): fix double-close error, add tests #4502

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 162 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -26,10 +26,12 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"google.golang.org/api/option"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)

var (
Expand Down Expand Up @@ -120,7 +122,7 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
}

func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
func TestIntegration_ManagedWriter_DefaultStream(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()
Expand Down Expand Up @@ -205,3 +207,162 @@ func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
wantRows = wantRows * 2
validateRowCount(ctx, t, bqClient, testTable, wantRows)
}

func TestIntegration_ManagedWriter_DynamicJSON(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, schema)

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

sampleData := [][]byte{
[]byte(`{"name": "one", "value": 1}`),
[]byte(`{"name": "two", "value": 2}`),
[]byte(`{"name": "three", "value": 3}`),
[]byte(`{"name": "four", "value": 4}`),
[]byte(`{"name": "five", "value": 5}`),
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)

// First, append rows individually.
var results []*AppendResult
for k, v := range sampleData {
message := dynamicpb.NewMessage(md)

// First, json->proto message
err = protojson.Unmarshal(v, message)
if err != nil {
t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
}
// Then, proto message -> bytes.
b, err := proto.Marshal(message)
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}

// wait for the result to indicate ready, then validate.
results[0].Ready()
wantRows := int64(len(sampleData))
validateRowCount(ctx, t, bqClient, testTable, wantRows)
}

func TestIntegration_ManagedWriter_BufferedStream(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// prep a suitable destination table.
testTable := dataset.Table(tableIDs.New())
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType, Required: true},
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
}
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.
m := &testdata.SimpleMessage{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(BufferedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID)
if err != nil {
t.Errorf("couldn't get stream info: %v", err)
}
if info.GetType().String() != string(ms.StreamType()) {
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
}

// prevalidate we have no data in table.
validateRowCount(ctx, t, bqClient, testTable, 0)

testData := []*testdata.SimpleMessage{
{Name: "one", Value: 1},
{Name: "two", Value: 2},
{Name: "three", Value: 3},
{Name: "four", Value: 1},
{Name: "five", Value: 2},
}

// First, send the test rows individually, validate, then advance.
var expectedRows int64
for k, mesg := range testData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err := ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
// wait for ack
offset, err := results[0].GetResult(ctx)
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
// move offset and re-validate.
flushOffset, err := ms.FlushRows(ctx, offset)
if err != nil {
t.Errorf("failed to flush offset to %d: %v", offset, err)
}
expectedRows = flushOffset + 1
validateRowCount(ctx, t, bqClient, testTable, expectedRows)
}
}
3 changes: 2 additions & 1 deletion bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -349,8 +349,9 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
off := success.GetOffset()
if off != nil {
nextWrite.markDone(off.GetValue(), nil, fc)
} else {
nextWrite.markDone(NoStreamOffset, nil, fc)
}
nextWrite.markDone(NoStreamOffset, nil, fc)
}
}
}