From 596d6e6dcaf890f07ca749302ef26b5cbcce6e09 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 16 Sep 2022 14:50:47 -0700 Subject: [PATCH] refactor(bigquery/storage/managedwriter): simplify lockingAppend (#6686) * refactor(bigquery/storage/managedwriter): simplify lockingAppend This PR is another preparatory change towards implementing append retries. Here, we move responsibility for records stats into the lockingAppend() method, which allows us to simplify the return to emitting the error. This will in turn allow us to use lockingAppend within the receiver to re-enqueue appends. --- .../storage/managedwriter/managed_stream.go | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 6a18dd0f574..4988280527f 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -262,20 +262,28 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie // lockingAppend handles a single append attempt. When successful, it returns the number of rows // in the request for metrics tracking. -func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { +func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { // Don't both calling/retrying if this append's context is already expired. if err := pw.reqCtx.Err(); err != nil { - return 0, err + return err } + // we use this to record stats if needed after we unlock on defer. + var statsOnExit func() + // critical section: Things that need to happen inside the critical section: // // * Getting the stream connection (in case of reconnects) // * Issuing the append request // * Adding the pending write to the channel to keep ordering correct on response ms.mu.Lock() - defer ms.mu.Unlock() + defer func() { + ms.mu.Unlock() + if statsOnExit != nil { + statsOnExit() + } + }() var arc *storagepb.BigQueryWrite_AppendRowsClient var ch chan *pendingWrite @@ -290,7 +298,7 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { } arc, ch, err = ms.getStream(arc, reconnect) if err != nil { - return 0, err + return err } // Resolve the special work for the first append on a stream. @@ -318,13 +326,19 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { if shouldReconnect(err) { ms.reconnect = true } - return 0, err + return err } // Compute numRows, once we pass ownership to the channel the request may be // cleared. numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())) + statsOnExit = func() { + // these will get recorded once we exit the critical section. + recordStat(ms.ctx, AppendRequestRows, numRows) + recordStat(ms.ctx, AppendRequests, 1) + recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) + } ch <- pw - return numRows, nil + return nil } // appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long @@ -343,7 +357,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio } for { - numRows, appendErr := ms.lockingAppend(pw) + appendErr := ms.lockingAppend(pw) if appendErr != nil { // Append yielded an error. Retry by continuing or return. status := grpcstatus.Convert(appendErr) @@ -362,9 +376,6 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio pw.markDone(nil, appendErr, ms.fc) return appendErr } - recordStat(ms.ctx, AppendRequests, 1) - recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) - recordStat(ms.ctx, AppendRequestRows, numRows) return nil } }