Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): support append retries #6695

Merged
merged 19 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 19 additions & 3 deletions bigquery/storage/managedwriter/appendresult.go
Expand Up @@ -41,6 +41,9 @@ type AppendResult struct {

// retains the original response.
response *storagepb.AppendRowsResponse

// retains the number of times this individual write was enqueued.
totalAttempts int
}

func newAppendResult(data [][]byte) *AppendResult {
Expand Down Expand Up @@ -146,6 +149,18 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche
}
}

// TotalAttempts returns the number of times this write was attempted.
//
// This call blocks until the result is ready, or context is no longer valid.
func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
select {
case <-ctx.Done():
return 0, fmt.Errorf("context done")
case <-ar.Ready():
return ar.totalAttempts, nil
}
}

// pendingWrite tracks state for a set of rows that are part of a single
// append request.
type pendingWrite struct {
Expand Down Expand Up @@ -180,9 +195,8 @@ func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite {
},
},
},
result: newAppendResult(appends),
attemptCount: 1,
reqCtx: ctx,
result: newAppendResult(appends),
reqCtx: ctx,
}
// We compute the size now for flow controller purposes, though
// the actual request size may be slightly larger (e.g. the first
Expand All @@ -198,6 +212,8 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error,
pw.result.response = resp
}
pw.result.err = err
// Record the final attempts in the result for the user.
pw.result.totalAttempts = pw.attemptCount

close(pw.result.ready)
// Clear the reference to the request.
Expand Down
2 changes: 2 additions & 0 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -117,6 +117,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: createOpenF(ctx, streamFunc),
// We add the new retryer by default, and add a new option to disable it.
retry: newStatelessRetryer(),
}

// apply writer options
Expand Down
12 changes: 11 additions & 1 deletion bigquery/storage/managedwriter/instrumentation.go
Expand Up @@ -75,6 +75,11 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)

// AppendRetryCount is a measure of the number of appends that were automatically retried by the library
// after receiving a non-successful response.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless)

// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
Expand Down Expand Up @@ -114,6 +119,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrorsView *view.View

// AppendRetryView is a cumulative sum of AppendRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRetryView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
Expand All @@ -130,7 +139,7 @@ func init() {

AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)

AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin)
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)

DefaultOpenCensusViews = []*view.View{
Expand All @@ -144,6 +153,7 @@ func init() {

AppendResponsesView,
AppendResponseErrorsView,
AppendRetryView,

FlushRequestsView,
}
Expand Down
97 changes: 73 additions & 24 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"sync"
"time"

"cloud.google.com/go/bigquery/internal"
"github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -76,6 +77,7 @@ type ManagedStream struct {
destinationTable string
c *Client
fc *flowController
retry *statelessRetryer

// aspects of the stream client
ctx context.Context // retained context for the stream
Expand Down Expand Up @@ -223,7 +225,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient
//
// Only getStream() should call this.
func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
r := defaultRetryer{}
r := &unaryRetryer{}
for {
recordStat(ms.ctx, AppendClientOpenCount, 1)
streamID := ""
Expand All @@ -250,7 +252,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
}
}
ch := make(chan *pendingWrite, depth)
go recvProcessor(ms.ctx, arc, ms.fc, ch)
go recvProcessor(ms, arc, ch)
// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
// for every new connection.
ms.streamSetup = new(sync.Once)
Expand Down Expand Up @@ -315,6 +317,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
req = reqCopy
})

// Increment the attempt count.
pw.attemptCount = pw.attemptCount + 1
if req != nil {
// First append in a new connection needs properties like schema and stream name set.
err = (*arc).Send(req)
Expand All @@ -324,6 +328,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
}
if err != nil {
if shouldReconnect(err) {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
// certain error responses are indicative that this connection is no longer healthy.
// if we encounter them, we force a reconnect so the next append has a healthy connection.
ms.reconnect = true
}
return err
Expand All @@ -345,16 +351,11 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
// lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked
// for expiry to enable faster failures, it is not propagated more deeply.
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {

// Resolve retry settings.
var settings gax.CallSettings
for _, opt := range opts {
opt.Resolve(&settings)
}
var r gax.Retryer = &defaultRetryer{}
if settings.Retry != nil {
r = settings.Retry()
}

for {
appendErr := ms.lockingAppend(pw)
Expand All @@ -365,7 +366,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
}
bo, shouldRetry := r.Retry(appendErr)
bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
if shouldRetry {
if err := gax.Sleep(ms.ctx, bo); err != nil {
return err
Expand Down Expand Up @@ -466,44 +467,92 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...

// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
//
// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
// with the mutex lock.
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
// TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply
// ensure that pending writes get acknowledged with a terminal state.
// The receive processor is only responsible for a single bidi channel/channel. As new connections are established,
// each gets it's own instance of a processor.
//
// The ManagedStream reference is used for performing re-enqueing of failed writes.
func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
for {
select {
case <-ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work failed with the context error.
case <-ms.ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work left in the channel
// with the context error. We don't attempt to re-enqueue in this case.
for {
pw, ok := <-ch
if !ok {
return
}
pw.markDone(nil, ctx.Err(), fc)
pw.markDone(nil, ms.ctx.Err(), ms.fc)
}
case nextWrite, ok := <-ch:
if !ok {
// Channel closed, all elements processed.
return
}

// block until we get a corresponding response or err from stream.
resp, err := arc.Recv()
if err != nil {
nextWrite.markDone(nil, err, fc)
// Evaluate the error from the receive and possibly retry.
ms.processRetry(nextWrite, nil, err)
// We're done with the write regardless of outcome, continue onto the
// next element.
continue
}
recordStat(ctx, AppendResponses, 1)
// Record that we did in fact get a response from the backend.
recordStat(ms.ctx, AppendResponses, 1)

if status := resp.GetError(); status != nil {
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
if err != nil {
tagCtx = ctx
// The response from the backend embedded a status error. We record that the error
// occurred, and tag it based on the response code of the status.
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
recordStat(tagCtx, AppendResponseErrors, 1)
}
respErr := grpcstatus.ErrorProto(status)
if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry {
// We use the status error to evaluate and possible re-enqueue the write.
ms.processRetry(nextWrite, resp, respErr)
// We're done with the write regardless of outcome, continue on to the next
// element.
continue
}
recordStat(tagCtx, AppendResponseErrors, 1)
}
nextWrite.markDone(resp, nil, fc)
// We had no error in the receive or in the response. Mark the write done.
nextWrite.markDone(resp, nil, ms.fc)
}
}
}

// processRetry is responsible for evaluating and re-enqueing an append.
// If the append is not retried, it is marked complete.
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount)
if !shouldRetry {
// Should not attempt to re-append.
pw.markDone(appendResp, err, ms.fc)
return
}
time.Sleep(pause)
err = ms.appendWithRetry(pw)
if err != nil {
// Re-enqueue failed, send it through the loop again.
continue
}
// Break out of the loop, we were successful and the write has been
// re-inserted.
recordStat(ms.ctx, AppendRetryCount, 1)
break
}
}

// returns the stateless retryer. If one's not set (re-enqueue retries disabled),
// it returns a retryer that only permits single attempts.
func (ms *ManagedStream) statelessRetryer() *statelessRetryer {
if ms.retry != nil {
return ms.retry
}
return &statelessRetryer{
maxAttempts: 1,
}
}