Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(bigquery/storage/managedwriter): change AppendRows behavior #4729

Merged
merged 2 commits into from Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 10 additions & 27 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -26,10 +26,10 @@ import (
// stream offset (e.g. a default stream which allows simultaneous append streams).
const NoStreamOffset int64 = -1

// AppendResult tracks the status of a single row of data.
// AppendResult tracks the status of a batch of data rows.
type AppendResult struct {
// rowData contains the serialized row data.
rowData []byte
rowData [][]byte

ready chan struct{}

Expand All @@ -40,7 +40,7 @@ type AppendResult struct {
offset int64
}

func newAppendResult(data []byte) *AppendResult {
func newAppendResult(data [][]byte) *AppendResult {
return &AppendResult{
ready: make(chan struct{}),
rowData: data,
Expand All @@ -66,7 +66,7 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
// append request.
type pendingWrite struct {
request *storagepb.AppendRowsRequest
results []*AppendResult
result *AppendResult

// this is used by the flow controller.
reqSize int
Expand All @@ -78,11 +78,6 @@ type pendingWrite struct {
// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING
// streams, this should be managed by the user.
func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {

results := make([]*AppendResult, len(appends))
for k, r := range appends {
results[k] = newAppendResult(r)
}
pw := &pendingWrite{
request: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -93,7 +88,7 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
},
},
},
results: results,
result: newAppendResult(appends),
}
if offset > 0 {
pw.request.Offset = &wrapperspb.Int64Value{Value: offset}
Expand All @@ -105,24 +100,12 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
return pw
}

// markDone propagates finalization of an append request to associated
// AppendResult references.
// markDone propagates finalization of an append request to the associated
// AppendResult.
func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowController) {
curOffset := startOffset
for _, ar := range pw.results {
if err != nil {
ar.err = err
close(ar.ready)
continue
}

ar.offset = curOffset
// only advance curOffset if we were given a valid starting offset.
if startOffset >= 0 {
curOffset = curOffset + 1
}
close(ar.ready)
}
pw.result.err = err
pw.result.offset = startOffset
close(pw.result.ready)
// Clear the reference to the request.
pw.request = nil
// if there's a flow controller, signal release. The only time this should be nil is when
Expand Down
82 changes: 45 additions & 37 deletions bigquery/storage/managedwriter/appendresult_test.go
Expand Up @@ -23,11 +23,16 @@ import (

func TestAppendResult(t *testing.T) {

wantRowBytes := []byte("rowdata")
wantRowBytes := [][]byte{[]byte("rowdata")}

gotAR := newAppendResult(wantRowBytes)
if !bytes.Equal(gotAR.rowData, wantRowBytes) {
t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes)
if len(gotAR.rowData) != len(wantRowBytes) {
t.Fatalf("length mismatch, got %d want %d elements", len(gotAR.rowData), len(wantRowBytes))
}
for i := 0; i < len(gotAR.rowData); i++ {
if !bytes.Equal(gotAR.rowData[i], wantRowBytes[i]) {
t.Errorf("mismatch in row data %d, got %q want %q", i, gotAR.rowData, wantRowBytes)
}
}
}

Expand All @@ -46,13 +51,11 @@ func TestPendingWrite(t *testing.T) {
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
}
pending.markDone(NoStreamOffset, nil, nil)
for k, ar := range pending.results {
if ar.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset)
}
if ar.err != nil {
t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err)
}
if pending.result.offset != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
}
if pending.result.err != nil {
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
}

// now, verify behavior with a valid offset
Expand All @@ -70,22 +73,23 @@ func TestPendingWrite(t *testing.T) {
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
}

// verify child AppendResults
if len(pending.results) != len(wantRowData) {
t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results))
// verify AppendResult

gotData := pending.result.rowData
if len(gotData) != len(wantRowData) {
t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData))
}
for k, ar := range pending.results {
gotData := ar.rowData
if !bytes.Equal(gotData, wantRowData[k]) {
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
}
select {
case <-ar.Ready():
t.Errorf("got Ready() on incomplete AppendResult %d", k)
case <-time.After(100 * time.Millisecond):
continue
for i := 0; i < len(gotData); i++ {
if !bytes.Equal(gotData[i], wantRowData[i]) {
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
}
}
select {
case <-pending.result.Ready():
t.Errorf("got Ready() on incomplete AppendResult")
case <-time.After(100 * time.Millisecond):

}

// verify completion behavior
reportedOffset := int64(101)
Expand All @@ -95,22 +99,26 @@ func TestPendingWrite(t *testing.T) {
if pending.request != nil {
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
}
for k, ar := range pending.results {
gotData := ar.rowData
if !bytes.Equal(gotData, wantRowData[k]) {
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
}
select {
case <-ar.Ready():
continue
case <-time.After(100 * time.Millisecond):
t.Errorf("possible blocking on completed AppendResult %d", k)
gotData = pending.result.rowData
if len(gotData) != len(wantRowData) {
t.Errorf("length mismatch in data: got %d, want %d", len(gotData), len(wantRowData))
}
for i := 0; i < len(gotData); i++ {
if !bytes.Equal(gotData[i], wantRowData[i]) {
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
}
if ar.offset != reportedOffset+int64(k) {
t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k))
}

select {

case <-time.After(100 * time.Millisecond):
t.Errorf("possible blocking on completed AppendResult")
case <-pending.result.Ready():
if pending.result.offset != reportedOffset {
t.Errorf("mismatch on completed AppendResult offset: got %d want %d", pending.result.offset, reportedOffset)
}
if ar.err != wantErr {
t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr)
if pending.result.err != wantErr {
t.Errorf("mismatch in errors, got %v want %v", pending.result.err, wantErr)
}
}

Expand Down
40 changes: 20 additions & 20 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -173,20 +173,20 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(0))

// First, send the test rows individually.
var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
withExactRowCount(int64(len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))))
Expand All @@ -200,13 +200,13 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
}
data = append(data, b)
}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
// wait for the result to indicate ready, then validate again. Our total rows have increased, but
// cardinality should not.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
withExactRowCount(int64(2*len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))),
Expand Down Expand Up @@ -241,7 +241,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
[]byte(`{"name": "five", "value": 5}`),
}

var results []*AppendResult
var result *AppendResult
for k, v := range sampleJSONData {
message := dynamicpb.NewMessage(md)

Expand All @@ -255,14 +255,14 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
results, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}

// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(sampleJSONData))),
withDistinctValues("name", int64(len(sampleJSONData))),
Expand Down Expand Up @@ -309,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
t.Errorf("single-row append %d failed: %v", k, err)
}
// wait for ack
offset, err := results[0].GetResult(ctx)
offset, err := results.GetResult(ctx)
if err != nil {
t.Errorf("got error from pending result %d: %v", k, err)
}
Expand Down Expand Up @@ -350,20 +350,20 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready, then validate.
results[0].Ready()
result.Ready()
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}
Expand All @@ -389,19 +389,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(0))

// Send data.
var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
results[0].Ready()
result.Ready()
wantRows := int64(len(testSimpleData))

// Mark stream complete.
Expand Down Expand Up @@ -455,20 +455,20 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Fatalf("NewManagedStream: %v", err)
}

var results []*AppendResult
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// wait for the result to indicate ready.
results[0].Ready()
result.Ready()
// Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance
// to report.
time.Sleep(time.Second)
Expand Down Expand Up @@ -607,12 +607,12 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
results, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
if err != nil {
t.Errorf("append failed: %v", err)
}

_, err = results[0].GetResult(ctx)
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error in response: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -323,10 +323,12 @@ func (ms *ManagedStream) Close() error {
return err
}

// AppendRows sends the append requests to the service, and returns one AppendResult per row.
// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
// the set of data.
//
// The format of the row data is binary serialized protocol buffer bytes, and and the message
// must adhere to the format of the schema Descriptor passed in when creating the managed stream.
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) ([]*AppendResult, error) {
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset int64) (*AppendResult, error) {
pw := newPendingWrite(data, offset)
// check flow control
if err := ms.fc.acquire(ctx, pw.reqSize); err != nil {
Expand All @@ -339,7 +341,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, offset i
pw.markDone(NoStreamOffset, err, ms.fc)
return nil, err
}
return pw.results, nil
return pw.result, nil
}

// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
Expand Down