diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 76a5d028421..6a18dd0f574 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -358,11 +358,8 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio } continue } - // We've got a non-retriable error, so propagate that up. and mark the write done. - ms.mu.Lock() - ms.err = appendErr + // Mark the pending write done. This will not be returned to the user, they'll receive the returned error. pw.markDone(nil, appendErr, ms.fc) - ms.mu.Unlock() return appendErr } recordStat(ms.ctx, AppendRequests, 1) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index 6d97c2ec579..be4564b0a33 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -300,7 +300,46 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) { if ct := ms.fc.count(); ct != wantCount { t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount) } +} + +func TestManagedStream_ContextExpiry(t *testing.T) { + // Issue: retaining error from append as stream error + // https://github.com/googleapis/google-cloud-go/issues/6657 + ctx := context.Background() + ms := &ManagedStream{ + ctx: ctx, + streamSettings: defaultStreamSettings(), + fc: newFlowController(0, 0), + open: openTestArc(&testAppendRowsClient{}, + func(req *storagepb.AppendRowsRequest) error { + // Append is intentionally slow. + return nil + }, nil), + } + ms.schemaDescriptor = &descriptorpb.DescriptorProto{ + Name: proto.String("testDescriptor"), + } + fakeData := [][]byte{ + []byte("foo"), + } + + // Create a context and immediately cancel it. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + + // First, append with an invalid context. + pw := newPendingWrite(cancelCtx, fakeData) + err := ms.appendWithRetry(pw) + if err != context.Canceled { + t.Errorf("expected cancelled context error, got: %v", err) + } + + // a second append with a valid context should succeed + _, err = ms.AppendRows(ctx, fakeData) + if err != nil { + t.Errorf("expected second append to succeed, but failed: %v", err) + } } func TestManagedStream_AppendDeadlocks(t *testing.T) {