Skip to content

Commit

Permalink
fix: user Timers over time.After (#1802)
Browse files Browse the repository at this point in the history
Can reduce memory usuage for stacking timers that are not gc'ed in
retry loops.

Fixes: #1761
  • Loading branch information
codyoss committed Jan 11, 2023
1 parent bcc345c commit 86e4009
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
16 changes: 12 additions & 4 deletions internal/gensupport/resumable.go
Expand Up @@ -193,22 +193,28 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err

// Each chunk gets its own initialized-at-zero backoff and invocation ID.
bo := rx.Retry.backoff()
quitAfter := time.After(retryDeadline)
quitAfterTimer := time.NewTimer(retryDeadline)
rx.attempts = 1
rx.invocationID = uuid.New().String()

// Retry loop for a single chunk.
for {
pauseTimer := time.NewTimer(pause)
select {
case <-ctx.Done():
quitAfterTimer.Stop()
pauseTimer.Stop()
if err == nil {
err = ctx.Err()
}
return prepareReturn(resp, err)
case <-time.After(pause):
case <-quitAfter:
case <-pauseTimer.C:
quitAfterTimer.Stop()
case <-quitAfterTimer.C:
pauseTimer.Stop()
return prepareReturn(resp, err)
}
pauseTimer.Stop()

// Check for context cancellation or timeout once more. If more than one
// case in the select statement above was satisfied at the same time, Go
Expand All @@ -217,13 +223,15 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
// canceled before or the timeout was reached.
select {
case <-ctx.Done():
quitAfterTimer.Stop()
if err == nil {
err = ctx.Err()
}
return prepareReturn(resp, err)
case <-quitAfter:
case <-quitAfterTimer.C:
return prepareReturn(resp, err)
default:
quitAfterTimer.Stop()
}

resp, err = rx.transferChunk(ctx)
Expand Down
4 changes: 3 additions & 1 deletion internal/gensupport/send.go
Expand Up @@ -115,15 +115,17 @@ func sendAndRetry(ctx context.Context, client *http.Client, req *http.Request, r
var errorFunc = retry.errorFunc()

for {
t := time.NewTimer(pause)
select {
case <-ctx.Done():
t.Stop()
// If we got an error and the context has been canceled, return an error acknowledging
// both the context cancelation and the service error.
if err != nil {
return resp, wrappedCallErr{ctx.Err(), err}
}
return resp, ctx.Err()
case <-time.After(pause):
case <-t.C:
}

if ctx.Err() != nil {
Expand Down
4 changes: 3 additions & 1 deletion transport/bytestream/client.go
Expand Up @@ -89,9 +89,11 @@ func (r *Reader) Read(p []byte) (int, error) {
if backoffDelay > backoffMax {
backoffDelay = backoffMax
}
t := time.NewTimer(backoffDelay)
select {
case <-time.After(backoffDelay):
case <-t.C:
case <-r.ctx.Done():
t.Stop()
if err := r.ctx.Err(); err != nil {
r.err = err
}
Expand Down

0 comments on commit 86e4009

Please sign in to comment.