Skip to content

Commit

Permalink
BREAKING CHANGE(bigquery/storage/managedwriter): change AppendRows (#…
Browse files Browse the repository at this point in the history
…4729)

behavior

Previously, AppendRows() on a managed stream would return one
AppendResult per data row.  This change instead switches the
behavior to return a single AppendResult for tracking the behavior
of the set of rows.

The original per-row contract was done in expectation that we'd
consider making batching decisions are a very granular level. However,
at this point it seems reasonable to consider only batching multiple
appends, not dividing individual batchs more granularly.
  • Loading branch information
shollyman committed Sep 8, 2021
1 parent 9c4b46c commit 9c9fbb2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 87 deletions.
37 changes: 10 additions & 27 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -26,10 +26,10 @@ import (
// stream offset (e.g. a default stream which allows simultaneous append streams).
const NoStreamOffset int64 = -1

// AppendResult tracks the status of a single row of data.
// AppendResult tracks the status of a batch of data rows.
type AppendResult struct {
// rowData contains the serialized row data.
rowData []byte
rowData [][]byte

ready chan struct{}

Expand All @@ -40,7 +40,7 @@ type AppendResult struct {
offset int64
}

func newAppendResult(data []byte) *AppendResult {
func newAppendResult(data [][]byte) *AppendResult {
return &AppendResult{
ready: make(chan struct{}),
rowData: data,
Expand All @@ -66,7 +66,7 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
// append request.
type pendingWrite struct {
request *storagepb.AppendRowsRequest
results []*AppendResult
result *AppendResult

// this is used by the flow controller.
reqSize int
Expand All @@ -78,11 +78,6 @@ type pendingWrite struct {
// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING
// streams, this should be managed by the user.
func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {

results := make([]*AppendResult, len(appends))
for k, r := range appends {
results[k] = newAppendResult(r)
}
pw := &pendingWrite{
request: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -93,7 +88,7 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
},
},
},
results: results,
result: newAppendResult(appends),
}
if offset > 0 {
pw.request.Offset = &wrapperspb.Int64Value{Value: offset}
Expand All @@ -105,24 +100,12 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
return pw
}

// markDone propagates finalization of an append request to associated
// AppendResult references.
// markDone propagates finalization of an append request to the associated
// AppendResult.
func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowController) {
curOffset := startOffset
for _, ar := range pw.results {
if err != nil {
ar.err = err
close(ar.ready)
continue
}

ar.offset = curOffset
// only advance curOffset if we were given a valid starting offset.
if startOffset >= 0 {
curOffset = curOffset + 1
}
close(ar.ready)
}
pw.result.err = err
pw.result.offset = startOffset
close(pw.result.ready)
// Clear the reference to the request.
pw.request = nil
// if there's a flow controller, signal release. The only time this should be nil is when
Expand Down
82 changes: 45 additions & 37 deletions bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -23,11 +23,16 @@ import (

func TestAppendResult(t *testing.T) {

wantRowBytes := []byte("rowdata")
wantRowBytes := [][]byte{[]byte("rowdata")}

gotAR := newAppendResult(wantRowBytes)
if !bytes.Equal(gotAR.rowData, wantRowBytes) {
t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes)
if len(gotAR.rowData) != len(wantRowBytes) {
t.Fatalf("length mismatch, got %d want %d elements", len(gotAR.rowData), len(wantRowBytes))
}
for i := 0; i < len(gotAR.rowData); i++ {
if !bytes.Equal(gotAR.rowData[i], wantRowBytes[i]) {
t.Errorf("mismatch in row data %d, got %q want %q", i, gotAR.rowData, wantRowBytes)
}
}
}

Expand All @@ -46,13 +51,11 @@ func TestPendingWrite(t *testing.T) {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoStreamOffset, nil, nil)
for k, ar := range pending.results {
if ar.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset)
}
if ar.err != nil {
t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err)
}
if pending.result.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
}
if pending.result.err != nil {
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
}

// now, verify behavior with a valid offset
Expand All @@ -70,22 +73,23 @@ func TestPendingWrite(t *testing.T) {
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
}

// verify child AppendResults
if len(pending.results) != len(wantRowData) {
t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results))
// verify AppendResult

gotData := pending.result.rowData
if len(gotData) != len(wantRowData) {
t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData))
}
for k, ar := range pending.results {
gotData := ar.rowData
if !bytes.Equal(gotData, wantRowData[k]) {
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
}
select {
case <-ar.Ready():
t.Errorf("got Ready() on incomplete AppendResult %d", k)
case <-time.After(100 * time.Millisecond):
continue
for i := 0; i < len(gotData); i++ {
if !bytes.Equal(gotData[i], wantRowData[i]) {
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
}
}
select {
case <-pending.result.Ready():
t.Errorf("got Ready() on incomplete AppendResult")
case <-time.After(100 * time.Millisecond):

}

// verify completion behavior
reportedOffset := int64(101)
Expand All @@ -95,22 +99,26 @@ func TestPendingWrite(t *testing.T) {
if pending.request != nil {
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
}
for k, ar := range pending.results {
gotData := ar.rowData
if !bytes.Equal(gotData, wantRowData[k]) {
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
}
select {
case <-ar.Ready():
continue
case <-time.After(100 * time.Millisecond):
t.Errorf("possible blocking on completed AppendResult %d", k)
gotData = pending.result.rowData
if len(gotData) != len(wantRowData) {
t.Errorf("length mismatch in data: got %d, want %d", len(gotData), len(wantRowData))
}
for i := 0; i < len(gotData); i++ {
if !bytes.Equal(gotData[i], wantRowData[i]) {
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
}
if ar.offset != reportedOffset+int64(k) {
t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k))
}

select {

case <-time.After(100 * time.Millisecond):
t.Errorf("possible blocking on completed AppendResult")
case <-pending.result.Ready():
if pending.result.offset != reportedOffset {
t.Errorf("mismatch on completed AppendResult offset: got %d want %d", pending.result.offset, reportedOffset)
}
if ar.err != wantErr {
t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr)
if pending.result.err != wantErr {
t.Errorf("mismatch in errors, got %v want %v", pending.result.err, wantErr)
}
}

Expand Down
40 changes: 20 additions & 20 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -173,20 +173,20 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(0))

// First, send the test rows individually.
var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
withExactRowCount(int64(len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))))
Expand All @@ -200,13 +200,13 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}
data = append(data, b)
}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
// wait for the result to indicate ready, then validate again. Our total rows have increased, but
// cardinality should not.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
withExactRowCount(int64(2*len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))),
Expand Down Expand Up @@ -241,7 +241,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
[]byte(`{"name": "five", "value": 5}`),
}

var results []*AppendResult
var result *AppendResult
for k, v := range sampleJSONData {
message := dynamicpb.NewMessage(md)

Expand All @@ -255,14 +255,14 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}

// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(sampleJSONData))),
withDistinctValues("name", int64(len(sampleJSONData))),
Expand Down Expand Up @@ -309,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
t.Errorf("single-row append %d failed: %v", k, err)
}
// wait for ack
offset, err := results[0].GetResult(ctx)
offset, err := results.GetResult(ctx)
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
Expand Down Expand Up @@ -350,20 +350,20 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}
Expand All @@ -389,19 +389,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(0))

// Send data.
var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
results[0].Ready()
result.Ready()
wantRows := int64(len(testSimpleData))

// Mark stream complete.
Expand Down Expand Up @@ -455,20 +455,20 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Fatalf("NewManagedStream: %v", err)
}

var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready.
results[0].Ready()
result.Ready()
// Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance
// to report.
time.Sleep(time.Second)
Expand Down Expand Up @@ -607,12 +607,12 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
results, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
if err != nil {
t.Errorf("append failed: %v", err)
}

_, err = results[0].GetResult(ctx)
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error in response: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -323,10 +323,12 @@ func (ms *ManagedStream) Close() error {
return err
}

// AppendRows sends the append requests to the service, and returns one AppendResult per row.
// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
// the set of data.
//
// The format of the row data is binary serialized protocol buffer bytes, and and the message
// must adhere to the format of the schema Descriptor passed in when creating the managed stream.
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) (*AppendResult, error) {
pw := newPendingWrite(data, offset)
// check flow control
if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
Expand All @@ -339,7 +341,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset i
pw.markDone(NoStreamOffset, err, ms.fc)
return nil, err
}
return pw.results, nil
return pw.result, nil
}

// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
Expand Down

0 comments on commit 9c9fbb2

Please sign in to comment.