From 9c9fbb2b610c6ffc0441af7b802761dbb2326f67 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 8 Sep 2021 09:12:57 -0700 Subject: [PATCH] BREAKING CHANGE(bigquery/storage/managedwriter): change AppendRows (#4729) behavior Previously, AppendRows() on a managed stream would return one AppendResult per data row. This change instead switches the behavior to return a single AppendResult for tracking the behavior of the set of rows. The original per-row contract was done in expectation that we'd consider making batching decisions are a very granular level. However, at this point it seems reasonable to consider only batching multiple appends, not dividing individual batchs more granularly. --- .../storage/managedwriter/appendresult.go | 37 +++------ .../managedwriter/appendresult_test.go | 82 ++++++++++--------- .../storage/managedwriter/integration_test.go | 40 ++++----- .../storage/managedwriter/managed_stream.go | 8 +- 4 files changed, 80 insertions(+), 87 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 2cfff5069c6..8158557be11 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -26,10 +26,10 @@ import ( // stream offset (e.g. a default stream which allows simultaneous append streams). const NoStreamOffset int64 = -1 -// AppendResult tracks the status of a single row of data. +// AppendResult tracks the status of a batch of data rows. type AppendResult struct { // rowData contains the serialized row data. - rowData []byte + rowData [][]byte ready chan struct{} @@ -40,7 +40,7 @@ type AppendResult struct { offset int64 } -func newAppendResult(data []byte) *AppendResult { +func newAppendResult(data [][]byte) *AppendResult { return &AppendResult{ ready: make(chan struct{}), rowData: data, @@ -66,7 +66,7 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { // append request. type pendingWrite struct { request *storagepb.AppendRowsRequest - results []*AppendResult + result *AppendResult // this is used by the flow controller. reqSize int @@ -78,11 +78,6 @@ type pendingWrite struct { // the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING // streams, this should be managed by the user. func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { - - results := make([]*AppendResult, len(appends)) - for k, r := range appends { - results[k] = newAppendResult(r) - } pw := &pendingWrite{ request: &storagepb.AppendRowsRequest{ Rows: &storagepb.AppendRowsRequest_ProtoRows{ @@ -93,7 +88,7 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { }, }, }, - results: results, + result: newAppendResult(appends), } if offset > 0 { pw.request.Offset = &wrapperspb.Int64Value{Value: offset} @@ -105,24 +100,12 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { return pw } -// markDone propagates finalization of an append request to associated -// AppendResult references. +// markDone propagates finalization of an append request to the associated +// AppendResult. func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowController) { - curOffset := startOffset - for _, ar := range pw.results { - if err != nil { - ar.err = err - close(ar.ready) - continue - } - - ar.offset = curOffset - // only advance curOffset if we were given a valid starting offset. - if startOffset >= 0 { - curOffset = curOffset + 1 - } - close(ar.ready) - } + pw.result.err = err + pw.result.offset = startOffset + close(pw.result.ready) // Clear the reference to the request. pw.request = nil // if there's a flow controller, signal release. The only time this should be nil is when diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 7ef86ea995f..e30a53fb6d2 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -23,11 +23,16 @@ import ( func TestAppendResult(t *testing.T) { - wantRowBytes := []byte("rowdata") + wantRowBytes := [][]byte{[]byte("rowdata")} gotAR := newAppendResult(wantRowBytes) - if !bytes.Equal(gotAR.rowData, wantRowBytes) { - t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes) + if len(gotAR.rowData) != len(wantRowBytes) { + t.Fatalf("length mismatch, got %d want %d elements", len(gotAR.rowData), len(wantRowBytes)) + } + for i := 0; i < len(gotAR.rowData); i++ { + if !bytes.Equal(gotAR.rowData[i], wantRowBytes[i]) { + t.Errorf("mismatch in row data %d, got %q want %q", i, gotAR.rowData, wantRowBytes) + } } } @@ -46,13 +51,11 @@ func TestPendingWrite(t *testing.T) { t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) } pending.markDone(NoStreamOffset, nil, nil) - for k, ar := range pending.results { - if ar.offset != NoStreamOffset { - t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset) - } - if ar.err != nil { - t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err) - } + if pending.result.offset != NoStreamOffset { + t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset) + } + if pending.result.err != nil { + t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err) } // now, verify behavior with a valid offset @@ -70,22 +73,23 @@ func TestPendingWrite(t *testing.T) { t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData)) } - // verify child AppendResults - if len(pending.results) != len(wantRowData) { - t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results)) + // verify AppendResult + + gotData := pending.result.rowData + if len(gotData) != len(wantRowData) { + t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData)) } - for k, ar := range pending.results { - gotData := ar.rowData - if !bytes.Equal(gotData, wantRowData[k]) { - t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) - } - select { - case <-ar.Ready(): - t.Errorf("got Ready() on incomplete AppendResult %d", k) - case <-time.After(100 * time.Millisecond): - continue + for i := 0; i < len(gotData); i++ { + if !bytes.Equal(gotData[i], wantRowData[i]) { + t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i]) } } + select { + case <-pending.result.Ready(): + t.Errorf("got Ready() on incomplete AppendResult") + case <-time.After(100 * time.Millisecond): + + } // verify completion behavior reportedOffset := int64(101) @@ -95,22 +99,26 @@ func TestPendingWrite(t *testing.T) { if pending.request != nil { t.Errorf("expected request to be cleared, is present: %#v", pending.request) } - for k, ar := range pending.results { - gotData := ar.rowData - if !bytes.Equal(gotData, wantRowData[k]) { - t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) - } - select { - case <-ar.Ready(): - continue - case <-time.After(100 * time.Millisecond): - t.Errorf("possible blocking on completed AppendResult %d", k) + gotData = pending.result.rowData + if len(gotData) != len(wantRowData) { + t.Errorf("length mismatch in data: got %d, want %d", len(gotData), len(wantRowData)) + } + for i := 0; i < len(gotData); i++ { + if !bytes.Equal(gotData[i], wantRowData[i]) { + t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i]) } - if ar.offset != reportedOffset+int64(k) { - t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k)) + } + + select { + + case <-time.After(100 * time.Millisecond): + t.Errorf("possible blocking on completed AppendResult") + case <-pending.result.Ready(): + if pending.result.offset != reportedOffset { + t.Errorf("mismatch on completed AppendResult offset: got %d want %d", pending.result.offset, reportedOffset) } - if ar.err != wantErr { - t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr) + if pending.result.err != wantErr { + t.Errorf("mismatch in errors, got %v want %v", pending.result.err, wantErr) } } diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index df4840f37ce..7f047de3a3b 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -173,20 +173,20 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl withExactRowCount(0)) // First, send the test rows individually. - var results []*AppendResult + var result *AppendResult for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, NoStreamOffset) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } } // wait for the result to indicate ready, then validate. - results[0].Ready() + result.Ready() validateTableConstraints(ctx, t, bqClient, testTable, "after first send round", withExactRowCount(int64(len(testSimpleData))), withDistinctValues("name", int64(len(testSimpleData)))) @@ -200,13 +200,13 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl } data = append(data, b) } - results, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, NoStreamOffset) if err != nil { t.Errorf("grouped-row append failed: %v", err) } // wait for the result to indicate ready, then validate again. Our total rows have increased, but // cardinality should not. - results[0].Ready() + result.Ready() validateTableConstraints(ctx, t, bqClient, testTable, "after second send round", withExactRowCount(int64(2*len(testSimpleData))), withDistinctValues("name", int64(len(testSimpleData))), @@ -241,7 +241,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C []byte(`{"name": "five", "value": 5}`), } - var results []*AppendResult + var result *AppendResult for k, v := range sampleJSONData { message := dynamicpb.NewMessage(md) @@ -255,14 +255,14 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C if err != nil { t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) } - results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) + result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } } // wait for the result to indicate ready, then validate. - results[0].Ready() + result.Ready() validateTableConstraints(ctx, t, bqClient, testTable, "after send", withExactRowCount(int64(len(sampleJSONData))), withDistinctValues("name", int64(len(sampleJSONData))), @@ -309,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC t.Errorf("single-row append %d failed: %v", k, err) } // wait for ack - offset, err := results[0].GetResult(ctx) + offset, err := results.GetResult(ctx) if err != nil { t.Errorf("got error from pending result %d: %v", k, err) } @@ -350,20 +350,20 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq validateTableConstraints(ctx, t, bqClient, testTable, "before send", withExactRowCount(0)) - var results []*AppendResult + var result *AppendResult for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, NoStreamOffset) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } } // wait for the result to indicate ready, then validate. - results[0].Ready() + result.Ready() validateTableConstraints(ctx, t, bqClient, testTable, "after send", withExactRowCount(int64(len(testSimpleData)))) } @@ -389,19 +389,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl withExactRowCount(0)) // Send data. - var results []*AppendResult + var result *AppendResult for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, NoStreamOffset) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } } - results[0].Ready() + result.Ready() wantRows := int64(len(testSimpleData)) // Mark stream complete. @@ -455,20 +455,20 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq t.Fatalf("NewManagedStream: %v", err) } - var results []*AppendResult + var result *AppendResult for k, mesg := range testSimpleData { b, err := proto.Marshal(mesg) if err != nil { t.Errorf("failed to marshal message %d: %v", k, err) } data := [][]byte{b} - results, err = ms.AppendRows(ctx, data, NoStreamOffset) + result, err = ms.AppendRows(ctx, data, NoStreamOffset) if err != nil { t.Errorf("single-row append %d failed: %v", k, err) } } // wait for the result to indicate ready. - results[0].Ready() + result.Ready() // Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance // to report. time.Sleep(time.Second) @@ -607,12 +607,12 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, if err != nil { t.Fatalf("NewManagedStream: %v", err) } - results, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset) + result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset) if err != nil { t.Errorf("append failed: %v", err) } - _, err = results[0].GetResult(ctx) + _, err = result.GetResult(ctx) if err != nil { t.Errorf("error in response: %v", err) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index c630e4fe481..f4b7df4749b 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -323,10 +323,12 @@ func (ms *ManagedStream) Close() error { return err } -// AppendRows sends the append requests to the service, and returns one AppendResult per row. +// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking +// the set of data. +// // The format of the row data is binary serialized protocol buffer bytes, and and the message // must adhere to the format of the schema Descriptor passed in when creating the managed stream. -func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) { +func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) (*AppendResult, error) { pw := newPendingWrite(data, offset) // check flow control if err := ms.fc.acquire(ctx, pw.reqSize); err != nil { @@ -339,7 +341,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset i pw.markDone(NoStreamOffset, err, ms.fc) return nil, err } - return pw.results, nil + return pw.result, nil } // recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.