Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): simplify lockingAppend (#6686)
Browse files Browse the repository at this point in the history
* refactor(bigquery/storage/managedwriter): simplify lockingAppend

This PR is another preparatory change towards implementing append
retries.  Here, we move responsibility for records stats into the
lockingAppend() method, which allows us to simplify the return to
emitting the error.

This will in turn allow us to use lockingAppend within the receiver
to re-enqueue appends.
  • Loading branch information
shollyman committed Sep 16, 2022
1 parent bc7a5f6 commit 596d6e6
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -262,20 +262,28 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie

// lockingAppend handles a single append attempt. When successful, it returns the number of rows
// in the request for metrics tracking.
func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) {
func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {

// Don't both calling/retrying if this append's context is already expired.
if err := pw.reqCtx.Err(); err != nil {
return 0, err
return err
}

// we use this to record stats if needed after we unlock on defer.
var statsOnExit func()

// critical section: Things that need to happen inside the critical section:
//
// * Getting the stream connection (in case of reconnects)
// * Issuing the append request
// * Adding the pending write to the channel to keep ordering correct on response
ms.mu.Lock()
defer ms.mu.Unlock()
defer func() {
ms.mu.Unlock()
if statsOnExit != nil {
statsOnExit()
}
}()

var arc *storagepb.BigQueryWrite_AppendRowsClient
var ch chan *pendingWrite
Expand All @@ -290,7 +298,7 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) {
}
arc, ch, err = ms.getStream(arc, reconnect)
if err != nil {
return 0, err
return err
}

// Resolve the special work for the first append on a stream.
Expand Down Expand Up @@ -318,13 +326,19 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) {
if shouldReconnect(err) {
ms.reconnect = true
}
return 0, err
return err
}
// Compute numRows, once we pass ownership to the channel the request may be
// cleared.
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
statsOnExit = func() {
// these will get recorded once we exit the critical section.
recordStat(ms.ctx, AppendRequestRows, numRows)
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
}
ch <- pw
return numRows, nil
return nil
}

// appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
Expand All @@ -343,7 +357,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
}

for {
numRows, appendErr := ms.lockingAppend(pw)
appendErr := ms.lockingAppend(pw)
if appendErr != nil {
// Append yielded an error. Retry by continuing or return.
status := grpcstatus.Convert(appendErr)
Expand All @@ -362,9 +376,6 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
pw.markDone(nil, appendErr, ms.fc)
return appendErr
}
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
recordStat(ms.ctx, AppendRequestRows, numRows)
return nil
}
}
Expand Down

0 comments on commit 596d6e6

Please sign in to comment.