Skip to content

Commit

Permalink
client: propagate connection error causes to RPC statuses (#4311)
Browse files Browse the repository at this point in the history
  • Loading branch information
apolcyn committed Apr 13, 2021
1 parent 7a6ab59 commit c229922
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 61 deletions.
4 changes: 2 additions & 2 deletions clientconn.go
Expand Up @@ -1197,7 +1197,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
Expand Down
47 changes: 26 additions & 21 deletions internal/transport/http2_client.go
Expand Up @@ -347,12 +347,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting

Expand All @@ -370,14 +372,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}

Expand Down Expand Up @@ -845,12 +849,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
return nil
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
Expand All @@ -866,21 +870,20 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
}

// GracefulClose sets the state to draining, which prevents new streams from
Expand All @@ -899,7 +902,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down Expand Up @@ -1147,7 +1150,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
Expand All @@ -1165,7 +1168,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
Expand Down Expand Up @@ -1195,7 +1198,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}

Expand Down Expand Up @@ -1313,7 +1316,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
Expand All @@ -1322,7 +1326,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return
}
t.onPrefaceReceipt()
Expand Down Expand Up @@ -1358,7 +1363,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
Expand Down Expand Up @@ -1417,7 +1422,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
Expand Down
33 changes: 17 additions & 16 deletions internal/transport/keepalive_test.go
Expand Up @@ -24,6 +24,7 @@ package transport

import (
"context"
"fmt"
"io"
"net"
"testing"
Expand All @@ -47,7 +48,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -86,7 +87,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -169,7 +170,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -228,7 +229,7 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -257,7 +258,7 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -288,7 +289,7 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -317,7 +318,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -352,7 +353,7 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -391,7 +392,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -436,7 +437,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -480,7 +481,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -530,7 +531,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -564,7 +565,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -604,7 +605,7 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down Expand Up @@ -658,7 +659,7 @@ func (s) TestTCPUserTimeout(t *testing.T) {
},
)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Expand Up @@ -622,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error)

// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are
Expand Down

0 comments on commit c229922

Please sign in to comment.