Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: propagate connection error causes to RPC statuses #4311

Merged
merged 12 commits into from Apr 13, 2021
4 changes: 2 additions & 2 deletions clientconn.go
Expand Up @@ -1197,7 +1197,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
dfawley marked this conversation as resolved.
Show resolved Hide resolved
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
Expand Down
46 changes: 26 additions & 20 deletions internal/transport/http2_client.go
Expand Up @@ -347,12 +347,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting

Expand All @@ -370,14 +372,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}

Expand Down Expand Up @@ -845,7 +849,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) error {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
Expand All @@ -866,21 +870,21 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
closeErr := t.conn.Close()
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
return closeErr
}

// GracefulClose sets the state to draining, which prevents new streams from
Expand All @@ -899,7 +903,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down Expand Up @@ -1147,7 +1151,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with odd numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
Expand All @@ -1165,7 +1169,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
Expand Down Expand Up @@ -1195,7 +1199,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}

Expand Down Expand Up @@ -1313,7 +1317,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
Expand All @@ -1322,7 +1327,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame"))
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return
}
t.onPrefaceReceipt()
Expand Down Expand Up @@ -1358,7 +1364,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
Expand Down Expand Up @@ -1417,7 +1423,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
Expand Down
32 changes: 16 additions & 16 deletions internal/transport/keepalive_test.go
Expand Up @@ -47,7 +47,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -86,7 +86,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -169,7 +169,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -317,7 +317,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -530,7 +530,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -564,7 +564,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -658,7 +658,7 @@ func (s) TestTCPUserTimeout(t *testing.T) {
},
)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Expand Up @@ -622,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error) error

// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
Expand Down