Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): fix incorrect error retention (#…
Browse files Browse the repository at this point in the history
…6659)

Failures for an individual append can get retained as the terminal error for the stream.  Redact the logic responsible, and add a test that validates the correct behavior using context-based errors.

Fixes: #6657
  • Loading branch information
shollyman committed Sep 13, 2022
1 parent 74b6ad4 commit dc02bca
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
5 changes: 1 addition & 4 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -358,11 +358,8 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
}
continue
}
// We've got a non-retriable error, so propagate that up. and mark the write done.
ms.mu.Lock()
ms.err = appendErr
// Mark the pending write done. This will not be returned to the user, they'll receive the returned error.
pw.markDone(nil, appendErr, ms.fc)
ms.mu.Unlock()
return appendErr
}
recordStat(ms.ctx, AppendRequests, 1)
Expand Down
39 changes: 39 additions & 0 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -300,7 +300,46 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) {
if ct := ms.fc.count(); ct != wantCount {
t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount)
}
}

func TestManagedStream_ContextExpiry(t *testing.T) {
// Issue: retaining error from append as stream error
// https://github.com/googleapis/google-cloud-go/issues/6657
ctx := context.Background()

ms := &ManagedStream{
ctx: ctx,
streamSettings: defaultStreamSettings(),
fc: newFlowController(0, 0),
open: openTestArc(&testAppendRowsClient{},
func(req *storagepb.AppendRowsRequest) error {
// Append is intentionally slow.
return nil
}, nil),
}
ms.schemaDescriptor = &descriptorpb.DescriptorProto{
Name: proto.String("testDescriptor"),
}
fakeData := [][]byte{
[]byte("foo"),
}

// Create a context and immediately cancel it.
cancelCtx, cancel := context.WithCancel(ctx)
cancel()

// First, append with an invalid context.
pw := newPendingWrite(cancelCtx, fakeData)
err := ms.appendWithRetry(pw)
if err != context.Canceled {
t.Errorf("expected cancelled context error, got: %v", err)
}

// a second append with a valid context should succeed
_, err = ms.AppendRows(ctx, fakeData)
if err != nil {
t.Errorf("expected second append to succeed, but failed: %v", err)
}
}

func TestManagedStream_AppendDeadlocks(t *testing.T) {
Expand Down

0 comments on commit dc02bca

Please sign in to comment.