Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): address locking and schema updat…
Browse files Browse the repository at this point in the history
…es (#6243)

* fix(bigquery/storage/managedwriter): address locking and schema updates
  • Loading branch information
shollyman committed Jun 23, 2022
1 parent 2d04605 commit fe264a5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 26 deletions.
29 changes: 20 additions & 9 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -633,19 +634,29 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
if err != nil {
t.Errorf("failed to marshal evolved message: %v", err)
}
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto), WithOffset(curOffset))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
// Try to force connection errors from concurrent appends.
// We drop setting of offset to avoid commingling out-of-order append errors.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
}
wg.Done()
}()
}
wg.Wait()

validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(curOffset+1)),
withExactRowCount(int64(curOffset+5)),
withNullCount("name", 0),
withNonNullCount("other", 1),
withNonNullCount("other", 5),
)
}

Expand Down
38 changes: 21 additions & 17 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -174,10 +174,8 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (

// getStream returns either a valid ARC client stream or permanent error.
//
// Calling getStream locks the mutex.
// Any calls to getStream should do so in possesion of the critical section lock.
func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.err != nil {
return nil, nil, ms.err
}
Expand Down Expand Up @@ -205,7 +203,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient

// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection.
//
// Only getStream() should call this, and thus the calling code has the mutex lock.
// Only getStream() should call this.
func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
r := defaultRetryer{}
for {
Expand Down Expand Up @@ -262,12 +260,26 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
var err error

for {
// critical section: Things that need to happen inside the critical section:
//
// * Getting the stream connection (in case of reconnects)
// * Issuing the append request
// * Adding the pending write to the channel to keep ordering correct on response
ms.mu.Lock()

// Don't both calling/retrying if this append's context is already expired.
if err = requestCtx.Err(); err != nil {
return err
}

arc, ch, err = ms.getStream(arc, pw.newSchema != nil)
// If an updated schema is present, we need to reconnect the stream and update the reference
// schema for the stream.
reconnect := false
if pw.newSchema != nil && !proto.Equal(pw.newSchema, ms.schemaDescriptor) {
reconnect = true
ms.schemaDescriptor = proto.Clone(pw.newSchema).(*descriptorpb.DescriptorProto)
}
arc, ch, err = ms.getStream(arc, reconnect)
if err != nil {
return err
}
Expand All @@ -286,9 +298,6 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
req = reqCopy
})

// critical section: When we issue an append, we need to add the write to the pending channel
// to keep the response ordering correct.
ms.mu.Lock()
if req != nil {
// First append in a new connection needs properties like schema and stream name set.
err = (*arc).Send(req)
Expand Down Expand Up @@ -342,6 +351,8 @@ func (ms *ManagedStream) Close() error {

var arc *storagepb.BigQueryWrite_AppendRowsClient

// Critical section: get connection, close, mark closed.
ms.mu.Lock()
arc, ch, err := ms.getStream(arc, false)
if err != nil {
return err
Expand All @@ -353,8 +364,9 @@ func (ms *ManagedStream) Close() error {
if err == nil {
close(ch)
}
ms.mu.Lock()
ms.err = io.EOF

// Done with the critical section.
ms.mu.Unlock()
// Propagate cancellation.
if ms.cancel != nil {
Expand Down Expand Up @@ -383,14 +395,6 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
pw.markDone(NoStreamOffset, err, nil)
return nil, err
}
// if we've received an updated schema as part of a write, propagate it to both the cached schema and
// populate the schema in the request.
if pw.newSchema != nil {
ms.schemaDescriptor = pw.newSchema
pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: pw.newSchema,
}
}
// Call the underlying append. The stream has it's own retained context and will surface expiry on
// it's own, but we also need to respect any deadline for the provided context.
errCh := make(chan error)
Expand Down

0 comments on commit fe264a5

Please sign in to comment.