Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter): address locking and schema updates #6243

Merged
merged 6 commits into from Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 20 additions & 9 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -633,19 +634,29 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
if err != nil {
t.Errorf("failed to marshal evolved message: %v", err)
}
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto), WithOffset(curOffset))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
// Try to force connection errors from concurrent appends.
// We drop setting of offset to avoid commingling out-of-order append errors.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
}
wg.Done()
}()
}
wg.Wait()

validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(curOffset+1)),
withExactRowCount(int64(curOffset+5)),
withNullCount("name", 0),
withNonNullCount("other", 1),
withNonNullCount("other", 5),
)
}

Expand Down
38 changes: 21 additions & 17 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -174,10 +174,8 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (

// getStream returns either a valid ARC client stream or permanent error.
//
// Calling getStream locks the mutex.
// Any calls to getStream should do so in possesion of the critical section lock.
func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.err != nil {
return nil, nil, ms.err
}
Expand Down Expand Up @@ -205,7 +203,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient

// openWithRetry is responsible for navigating the (re)opening of the underlying stream connection.
//
// Only getStream() should call this, and thus the calling code has the mutex lock.
// Only getStream() should call this.
func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
r := defaultRetryer{}
for {
Expand Down Expand Up @@ -262,12 +260,26 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
var err error

for {
// critical section: Things that need to happen inside the critical section:
//
// * Getting the stream connection (in case of reconnects)
// * Issuing the append request
// * Adding the pending write to the channel to keep ordering correct on response
ms.mu.Lock()

// Don't both calling/retrying if this append's context is already expired.
if err = requestCtx.Err(); err != nil {
return err
}

arc, ch, err = ms.getStream(arc, pw.newSchema != nil)
// If an updated schema is present, we need to reconnect the stream and update the reference
// schema for the stream.
reconnect := false
if pw.newSchema != nil && !proto.Equal(pw.newSchema, ms.schemaDescriptor) {
reconnect = true
ms.schemaDescriptor = proto.Clone(pw.newSchema).(*descriptorpb.DescriptorProto)
}
arc, ch, err = ms.getStream(arc, reconnect)
if err != nil {
return err
}
Expand All @@ -286,9 +298,6 @@ func (ms *ManagedStream) append(requestCtx context.Context, pw *pendingWrite, op
req = reqCopy
})

// critical section: When we issue an append, we need to add the write to the pending channel
// to keep the response ordering correct.
ms.mu.Lock()
if req != nil {
// First append in a new connection needs properties like schema and stream name set.
err = (*arc).Send(req)
Expand Down Expand Up @@ -342,6 +351,8 @@ func (ms *ManagedStream) Close() error {

var arc *storagepb.BigQueryWrite_AppendRowsClient

// Critical section: get connection, close, mark closed.
ms.mu.Lock()
arc, ch, err := ms.getStream(arc, false)
if err != nil {
return err
Expand All @@ -353,8 +364,9 @@ func (ms *ManagedStream) Close() error {
if err == nil {
close(ch)
}
ms.mu.Lock()
ms.err = io.EOF

// Done with the critical section.
ms.mu.Unlock()
// Propagate cancellation.
if ms.cancel != nil {
Expand Down Expand Up @@ -383,14 +395,6 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
pw.markDone(NoStreamOffset, err, nil)
return nil, err
}
// if we've received an updated schema as part of a write, propagate it to both the cached schema and
// populate the schema in the request.
if pw.newSchema != nil {
ms.schemaDescriptor = pw.newSchema
pw.request.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: pw.newSchema,
}
}
// Call the underlying append. The stream has it's own retained context and will surface expiry on
// it's own, but we also need to respect any deadline for the provided context.
errCh := make(chan error)
Expand Down