Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: include details about GOAWAYs in status messages #4316

Merged
merged 11 commits into from Apr 24, 2021
4 changes: 2 additions & 2 deletions clientconn.go
Expand Up @@ -1197,7 +1197,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
Expand Down
66 changes: 44 additions & 22 deletions internal/transport/http2_client.go
Expand Up @@ -19,6 +19,7 @@
package transport

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -116,6 +117,9 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
Expand Down Expand Up @@ -347,12 +351,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting

Expand All @@ -370,14 +376,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}

Expand Down Expand Up @@ -845,7 +853,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) error {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
Expand All @@ -866,21 +874,27 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
closeErr := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
if len(goAwayDebugMessage) > 0 {
err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
return closeErr
}

// GracefulClose sets the state to draining, which prevents new streams from
Expand All @@ -899,7 +913,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down Expand Up @@ -1147,7 +1161,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with odd numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
Expand All @@ -1165,7 +1179,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
Expand Down Expand Up @@ -1195,7 +1209,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}

Expand All @@ -1211,12 +1225,18 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
var m bytes.Buffer
m.WriteString("code: ")
m.WriteString(f.ErrCode.String())
m.WriteString(", debug data: ")
m.Write(f.DebugData())
t.goAwayDebugMessage = m.String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better than t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, f.DebugData) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks better, changed to that

}

func (t *http2Client) GetGoAwayReason() GoAwayReason {
func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
t.mu.Lock()
defer t.mu.Unlock()
return t.goAwayReason
return t.goAwayReason, t.goAwayDebugMessage
}

func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
Expand Down Expand Up @@ -1313,7 +1333,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
Expand All @@ -1322,7 +1343,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame"))
return
}
t.onPrefaceReceipt()
Expand Down Expand Up @@ -1358,7 +1380,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
Expand Down Expand Up @@ -1417,7 +1439,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()
Expand Down
42 changes: 21 additions & 21 deletions internal/transport/keepalive_test.go
Expand Up @@ -47,7 +47,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand All @@ -68,7 +68,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayNoReason {
if reason, _ := client.GetGoAwayReason(); reason != GoAwayNoReason {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
}
case <-timeout.C:
Expand All @@ -86,7 +86,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand All @@ -142,7 +142,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayNoReason {
if reason, _ := client.GetGoAwayReason(); reason != GoAwayNoReason {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
}
case <-timeout.C:
Expand All @@ -169,7 +169,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -317,7 +317,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(nil)

conn, ok := <-connCh
if !ok {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}})
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand All @@ -402,7 +402,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) {
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand All @@ -447,7 +447,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand All @@ -497,7 +497,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
Expand Down Expand Up @@ -530,7 +530,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -564,7 +564,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down Expand Up @@ -658,7 +658,7 @@ func (s) TestTCPUserTimeout(t *testing.T) {
},
)
defer func() {
client.Close()
client.Close(nil)
server.stop()
cancel()
}()
Expand Down