Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix stream creation issue with transparent retry #5503

Merged
merged 1 commit into from Jul 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 19 additions & 13 deletions stream.go
Expand Up @@ -140,13 +140,13 @@ type ClientStream interface {
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
Expand Down Expand Up @@ -303,12 +303,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)

cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
if err != nil {
cs.finish(err)
return nil, err
}

// Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
op := func(a *csAttempt) error {
Expand Down Expand Up @@ -704,6 +698,18 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors.
return toRPCErr(op(cs.attempt))
}
if len(cs.buffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
// is created immediately before replaying the ops.
var err error
if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.mu.Unlock()
cs.finish(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cs.finish() grabs the lock again. Can something happen between us calling Unlock() on line 708 and cs.finish() calling Lock() that can cause trouble?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only for the first op, the clientStream we're using is only visible to the caller at this point (newClientStream).

return err
}
}
a := cs.attempt
cs.mu.Unlock()
err := op(a)
Expand Down