Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): improve network reconnection (#6338
Browse files Browse the repository at this point in the history
)

* fix(bigquery/storage/managedwriter): improve network reconnection

Issuing a sufficiently large single append request is enough to trigger
the server backend to close an existing grpc stream.  This PR addresses
the problem by allowing a failed request to signal that subsequent
requests should request a new grpc stream connection.

This PR also adds an integration test that induces the failure by
issuing a large request, and ensures subsequent requests succeed.

Towards: #6321
  • Loading branch information
shollyman committed Jul 15, 2022
1 parent 89a049a commit 085a038
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 3 deletions.
77 changes: 76 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strings"
"sync"
"testing"
"time"
Expand All @@ -27,9 +28,11 @@ import (
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"github.com/googleapis/gax-go/v2/apierror"
"go.opencensus.io/stats/view"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
Expand All @@ -42,7 +45,7 @@ import (
var (
datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
defaultTestTimeout = 30 * time.Second
defaultTestTimeout = 45 * time.Second
)

// our test data has cardinality 5 for names, 3 for values
Expand Down Expand Up @@ -153,6 +156,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsert", func(t *testing.T) {
testLargeInsert(ctx, t, mwClient, bqClient, dataset)
})
})
}

Expand Down Expand Up @@ -469,6 +475,75 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
withExactRowCount(int64(len(testSimpleData))))
}

func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

// Construct a Very Large request.
var data [][]byte
targetSize := 11 * 1024 * 1024 // 11 MB
b, err := proto.Marshal(testSimpleData[0])
if err != nil {
t.Errorf("failed to marshal message: %v", err)
}

numRows := targetSize / len(b)
data = make([][]byte, numRows)

for i := 0; i < numRows; i++ {
data[i] = b
}

result, err := ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("single append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("GetResult error was not an instance of ApiError")
}
if status := apiErr.GRPCStatus(); status.Code() != codes.InvalidArgument {
t.Errorf("expected InvalidArgument status, got %v", status)
}

details := apiErr.Details()
if details.DebugInfo == nil {
t.Errorf("expected DebugInfo to be populated, was nil")
}
wantSubstring := "Message size exceed the limitation of byte based flow control."
if detail := details.DebugInfo.GetDetail(); !strings.Contains(detail, wantSubstring) {
t.Errorf("detail missing desired substring: %s", detail)
}
}
// send a subsequent append as verification we can proceed.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("subsequent append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from second append: %v", err)
}
}

func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testedViews := []*view.View{
AppendRequestsView,
Expand Down
19 changes: 17 additions & 2 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package managedwriter

import (
"context"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -84,6 +85,7 @@ type ManagedStream struct {

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
reconnect bool // Request a reconnect before issuing another send.
err error // terminal error
pending chan *pendingWrite // writes awaiting status
streamSetup *sync.Once // handles amending the first request in a new stream
Expand Down Expand Up @@ -184,13 +186,21 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient
return nil, nil, ms.err
}

// Previous activity on the stream indicated it is not healthy, so propagate that as a reconnect.
if ms.reconnect {
forceReconnect = true
ms.reconnect = false
}
// Always return the retained ARC if the arg differs.
if arc != ms.arc && !forceReconnect {
return ms.arc, ms.pending, nil
}
if arc != ms.arc && forceReconnect && ms.arc != nil {
// In this case, we're forcing a close to apply changes to the stream
// that currently can't be modified on an established connection.
// In this case, we're forcing a close on the existing stream.
// This is due to either needing to reconnect to satisfy the needs of
// the current request (e.g. to signal a schema change), or because
// a previous request on the stream yielded a transient error and we
// want to reconnect before issuing a subsequent request.
//
// TODO: clean this up once internal issue 205756033 is resolved.
(*ms.arc).CloseSend()
Expand Down Expand Up @@ -297,6 +307,11 @@ func (ms *ManagedStream) lockingAppend(requestCtx context.Context, pw *pendingWr
err = (*arc).Send(pw.request)
}
if err != nil {
// Transient connection loss. If we got io.EOF from a send, we want subsequent appends to
// reconnect the network connection for the stream.
if errors.Is(err, io.EOF) {
ms.reconnect = true
}
return 0, err
}
// Compute numRows, once we pass ownership to the channel the request may be
Expand Down

0 comments on commit 085a038

Please sign in to comment.