diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 6a18dd0f574..4988280527f 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -262,20 +262,28 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie // lockingAppend handles a single append attempt. When successful, it returns the number of rows // in the request for metrics tracking. -func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { +func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { // Don't both calling/retrying if this append's context is already expired. if err := pw.reqCtx.Err(); err != nil { - return 0, err + return err } + // we use this to record stats if needed after we unlock on defer. + var statsOnExit func() + // critical section: Things that need to happen inside the critical section: // // * Getting the stream connection (in case of reconnects) // * Issuing the append request // * Adding the pending write to the channel to keep ordering correct on response ms.mu.Lock() - defer ms.mu.Unlock() + defer func() { + ms.mu.Unlock() + if statsOnExit != nil { + statsOnExit() + } + }() var arc *storagepb.BigQueryWrite_AppendRowsClient var ch chan *pendingWrite @@ -290,7 +298,7 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { } arc, ch, err = ms.getStream(arc, reconnect) if err != nil { - return 0, err + return err } // Resolve the special work for the first append on a stream. @@ -318,13 +326,19 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) (int64, error) { if shouldReconnect(err) { ms.reconnect = true } - return 0, err + return err } // Compute numRows, once we pass ownership to the channel the request may be // cleared. numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())) + statsOnExit = func() { + // these will get recorded once we exit the critical section. + recordStat(ms.ctx, AppendRequestRows, numRows) + recordStat(ms.ctx, AppendRequests, 1) + recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) + } ch <- pw - return numRows, nil + return nil } // appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long @@ -343,7 +357,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio } for { - numRows, appendErr := ms.lockingAppend(pw) + appendErr := ms.lockingAppend(pw) if appendErr != nil { // Append yielded an error. Retry by continuing or return. status := grpcstatus.Convert(appendErr) @@ -362,9 +376,6 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio pw.markDone(nil, appendErr, ms.fc) return appendErr } - recordStat(ms.ctx, AppendRequests, 1) - recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) - recordStat(ms.ctx, AppendRequestRows, numRows) return nil } }