diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index e916f6b3449..59930d6b18c 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "sync" "testing" "time" @@ -633,19 +634,29 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq if err != nil { t.Errorf("failed to marshal evolved message: %v", err) } - result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto), WithOffset(curOffset)) - if err != nil { - t.Errorf("failed evolved append: %v", err) - } - _, err = result.GetResult(ctx) - if err != nil { - t.Errorf("error on evolved append: %v", err) + // Try to force connection errors from concurrent appends. + // We drop setting of offset to avoid commingling out-of-order append errors. + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto)) + if err != nil { + t.Errorf("failed evolved append: %v", err) + } + _, err = result.GetResult(ctx) + if err != nil { + t.Errorf("error on evolved append: %v", err) + } + wg.Done() + }() } + wg.Wait() validateTableConstraints(ctx, t, bqClient, testTable, "after send", - withExactRowCount(int64(curOffset+1)), + withExactRowCount(int64(curOffset+5)), withNullCount("name", 0), - withNonNullCount("other", 1), + withNonNullCount("other", 5), ) } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 6e3fa8a95f2..9a1f627a825 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -174,10 +174,8 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( // getStream returns either a valid ARC client stream or permanent error. // -// Calling getStream locks the mutex. +// Any calls to getStream should do so in possesion of the critical section lock. func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { - ms.mu.Lock() - defer ms.mu.Unlock() if ms.err != nil { return nil, nil, ms.err } @@ -205,7 +203,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient // openWithRetry is responsible for navigating the (re)opening of the underlying stream connection. // -// Only getStream() should call this, and thus the calling code has the mutex lock. +// Only getStream() should call this. func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { r := defaultRetryer{} for { @@ -262,12 +260,26 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op var err error for { + // critical section: Things that need to happen inside the critical section: + // + // * Getting the stream connection (in case of reconnects) + // * Issuing the append request + // * Adding the pending write to the channel to keep ordering correct on response + ms.mu.Lock() + // Don't both calling/retrying if this append's context is already expired. if err = requestCtx.Err(); err != nil { return err } - arc, ch, err = ms.getStream(arc, pw.newSchema != nil) + // If an updated schema is present, we need to reconnect the stream and update the reference + // schema for the stream. + reconnect := false + if pw.newSchema != nil && !proto.Equal(pw.newSchema, ms.schemaDescriptor) { + reconnect = true + ms.schemaDescriptor = proto.Clone(pw.newSchema).(*descriptorpb.DescriptorProto) + } + arc, ch, err = ms.getStream(arc, reconnect) if err != nil { return err } @@ -286,9 +298,6 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op req = reqCopy }) - // critical section: When we issue an append, we need to add the write to the pending channel - // to keep the response ordering correct. - ms.mu.Lock() if req != nil { // First append in a new connection needs properties like schema and stream name set. err = (*arc).Send(req) @@ -342,6 +351,8 @@ func (ms *ManagedStream) Close() error { var arc *storagepb.BigQueryWrite_AppendRowsClient + // Critical section: get connection, close, mark closed. + ms.mu.Lock() arc, ch, err := ms.getStream(arc, false) if err != nil { return err @@ -353,8 +364,9 @@ func (ms *ManagedStream) Close() error { if err == nil { close(ch) } - ms.mu.Lock() ms.err = io.EOF + + // Done with the critical section. ms.mu.Unlock() // Propagate cancellation. if ms.cancel != nil { @@ -383,14 +395,6 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... pw.markDone(NoStreamOffset, err, nil) return nil, err } - // if we've received an updated schema as part of a write, propagate it to both the cached schema and - // populate the schema in the request. - if pw.newSchema != nil { - ms.schemaDescriptor = pw.newSchema - pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{ - ProtoDescriptor: pw.newSchema, - } - } // Call the underlying append. The stream has it's own retained context and will surface expiry on // it's own, but we also need to respect any deadline for the provided context. errCh := make(chan error)