Skip to content

Commit

Permalink
client: fix potential panic during RPC retries (grpc#5323)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 13, 2022
1 parent beb2eaf commit ffeff9e
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 151 deletions.
6 changes: 1 addition & 5 deletions clientconn.go
Expand Up @@ -907,14 +907,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
if err != nil {
return nil, nil, toRPCErr(err)
}
return t, done, nil
}

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
Expand Down
20 changes: 10 additions & 10 deletions internal/transport/http2_client.go
Expand Up @@ -631,17 +631,16 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
// server. In this case there is no reason to retry at all, as it is
// assumed the RPC would continue to fail on subsequent attempts.
// server. It's possible this could succeed on another transport, even if
// it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
type NewStreamError struct {
Err error

DoNotRetry bool
DoNotTransparentRetry bool
AllowTransparentRetry bool
}

func (e NewStreamError) Error() string {
Expand All @@ -650,11 +649,11 @@ func (e NewStreamError) Error() string {

// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
Expand Down Expand Up @@ -754,23 +753,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true
}, hdr)
if err != nil {
return nil, &NewStreamError{Err: err}
// Connection closed.
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
}
if success {
break
}
if hdrListSizeErr != nil {
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
return nil, &NewStreamError{Err: hdrListSizeErr}
}
firstTry = false
select {
case <-ch:
case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
return nil, &NewStreamError{Err: errStreamDrain}
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-t.ctx.Done():
return nil, &NewStreamError{Err: ErrConnClosing}
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if t.statsHandler != nil {
Expand Down
8 changes: 7 additions & 1 deletion picker_wrapper.go
Expand Up @@ -131,7 +131,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
if _, ok := status.FromError(err); ok {
// Status error: end the RPC unconditionally with this status.
return nil, nil, err
return nil, nil, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
Expand Down Expand Up @@ -175,3 +175,9 @@ func (pw *pickerWrapper) close() {
pw.done = true
close(pw.blockingCh)
}

// dropError is a wrapper error that indicates the LB policy wishes to drop the
// RPC and not retry it.
type dropError struct {
error
}

0 comments on commit ffeff9e

Please sign in to comment.