From f9673caf579d9d7e7474f14614ce79b6095f86de Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 13 Sep 2022 17:58:44 +0000 Subject: [PATCH 1/2] fix(bigquery/storage/managedwriter): fix incorrect error retention Failures for an individual append can get retained as the terminal error for the stream. Redact the logic responsible, and add a test that validates the correct behavior using context-based errors. --- .../storage/managedwriter/managed_stream.go | 5 +-- .../managedwriter/managed_stream_test.go | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 76a5d028421..6a18dd0f574 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -358,11 +358,8 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio } continue } - // We've got a non-retriable error, so propagate that up. and mark the write done. - ms.mu.Lock() - ms.err = appendErr + // Mark the pending write done. This will not be returned to the user, they'll receive the returned error. pw.markDone(nil, appendErr, ms.fc) - ms.mu.Unlock() return appendErr } recordStat(ms.ctx, AppendRequests, 1) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 6d97c2ec579..680fa65032e 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -300,7 +300,47 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) { if ct := ms.fc.count(); ct != wantCount { t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount) } +} + +func TestManagedStream_ContextExpiry(t *testing.T) { + // Issue: retaining error from append as stream error + // https://github.com/googleapis/google-cloud-go/issues/6657 + ctx := context.Background() + ms := &ManagedStream{ + ctx: ctx, + streamSettings: defaultStreamSettings(), + fc: newFlowController(0, 0), + open: openTestArc(&testAppendRowsClient{}, + func(req *storagepb.AppendRowsRequest) error { + // Append is intentionally slow. + return nil + }, nil), + } + ms.schemaDescriptor = &descriptorpb.DescriptorProto{ + Name: proto.String("testDescriptor"), + } + fakeData := [][]byte{ + []byte("foo"), + } + + // Create a context that will expire during the append, to verify the passed in + // context expires. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + + // force an append with an invalid context. + pw := newPendingWrite(cancelCtx, fakeData) + err := ms.appendWithRetry(pw) + if err != context.Canceled { + t.Errorf("expected cancelled context error, got: %v", err) + } + + // a second append with a valid context should succeed + _, err = ms.AppendRows(ctx, fakeData) + if err != nil { + t.Errorf("expected second append to succeed, but failed: %v", err) + } } func TestManagedStream_AppendDeadlocks(t *testing.T) { From 661a6ba2fc10dc4fd69d4e860ba9b20df9d76fcb Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Tue, 13 Sep 2022 18:39:52 +0000 Subject: [PATCH 2/2] cleanup comments --- bigquery/storage/managedwriter/managed_stream_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 680fa65032e..be4564b0a33 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -324,12 +324,11 @@ func TestManagedStream_ContextExpiry(t *testing.T) { []byte("foo"), } - // Create a context that will expire during the append, to verify the passed in - // context expires. + // Create a context and immediately cancel it. cancelCtx, cancel := context.WithCancel(ctx) cancel() - // force an append with an invalid context. + // First, append with an invalid context. pw := newPendingWrite(cancelCtx, fakeData) err := ms.appendWithRetry(pw) if err != context.Canceled {