From ba8e17ba244d06df24f95c9824eda6b2f33adf3f Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 31 Mar 2021 16:30:56 -0700 Subject: [PATCH 1/9] Propagate errors causing connection close to RPC statuses --- clientconn.go | 4 +-- internal/transport/http2_client.go | 46 ++++++++++++++++------------- internal/transport/transport.go | 2 +- test/end2end_test.go | 47 ++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 23 deletions(-) diff --git a/clientconn.go b/clientconn.go index 77a08fd33bf..0db796ccbd6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1197,7 +1197,7 @@ func (ac *addrConn) resetTransport() { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() - newTr.Close() + newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN")) return } ac.curAddr = addr @@ -1329,7 +1329,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne select { case <-time.After(time.Until(connectDeadline)): // We didn't get the preface in time. - newTr.Close() + newTr.Close(fmt.Errorf("failed to receive server preface within timeout")) channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) return nil, nil, errors.New("timed out waiting for server handshake") case <-prefaceReceived: diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index d5bbe720db5..f535636c99f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -347,12 +347,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts // Send connection preface to server. n, err := t.conn.Write(clientPreface) if err != nil { - t.Close() - return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) + err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err) + t.Close(err) + return nil, err } if n != len(clientPreface) { - t.Close() - return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) + err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) + t.Close(err) + return nil, err } var ss []http2.Setting @@ -370,14 +372,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } err = t.framer.fr.WriteSettings(ss...) if err != nil { - t.Close() - return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) + err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) + t.Close(err) + return nil, err } // Adjust the connection flow control window if needed. if delta := uint32(icwz - defaultWindowSize); delta > 0 { if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { - t.Close() - return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) + err = connectionErrorf(true, err, "transport: failed to write window update: %v", err) + t.Close(err) + return nil, err } } @@ -845,7 +849,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. // This method blocks until the addrConn that initiated this transport is // re-connected. This happens because t.onClose() begins reconnect logic at the // addrConn level and blocks until the addrConn is successfully connected. -func (t *http2Client) Close() error { +func (t *http2Client) Close(err error) error { t.mu.Lock() // Make sure we only Close once. if t.state == closing { @@ -866,13 +870,13 @@ func (t *http2Client) Close() error { t.mu.Unlock() t.controlBuf.finish() t.cancel() - err := t.conn.Close() + closeErr := t.conn.Close() if channelz.IsOn() { channelz.RemoveEntry(t.channelzID) } // Notify all active streams. for _, s := range streams { - t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) + t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) } if t.statsHandler != nil { connEnd := &stats.ConnEnd{ @@ -880,7 +884,7 @@ func (t *http2Client) Close() error { } t.statsHandler.HandleConn(t.ctx, connEnd) } - return err + return closeErr } // GracefulClose sets the state to draining, which prevents new streams from @@ -899,7 +903,7 @@ func (t *http2Client) GracefulClose() { active := len(t.activeStreams) t.mu.Unlock() if active == 0 { - t.Close() + t.Close(ErrConnClosing) return } t.controlBuf.put(&incomingGoAway{}) @@ -1147,7 +1151,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { id := f.LastStreamID if id > 0 && id%2 != 1 { t.mu.Unlock() - t.Close() + t.Close(connectionErrorf(true, nil, "received goaway with odd numbered stream id: %v", id)) return } // A client can receive multiple GoAways from the server (see @@ -1165,7 +1169,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { // If there are multiple GoAways the first one should always have an ID greater than the following ones. if id > t.prevGoAwayID { t.mu.Unlock() - t.Close() + t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)) return } default: @@ -1195,7 +1199,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { active := len(t.activeStreams) t.mu.Unlock() if active == 0 { - t.Close() + t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) } } @@ -1313,7 +1317,8 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.fr.ReadFrame() if err != nil { - t.Close() // this kicks off resetTransport, so must be last before return + err = connectionErrorf(true, err, "error reading server preface: %v", err) + t.Close(err) // this kicks off resetTransport, so must be last before return return } t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) @@ -1322,7 +1327,8 @@ func (t *http2Client) reader() { } sf, ok := frame.(*http2.SettingsFrame) if !ok { - t.Close() // this kicks off resetTransport, so must be last before return + // this kicks off resetTransport, so must be last before return + t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame")) return } t.onPrefaceReceipt() @@ -1358,7 +1364,7 @@ func (t *http2Client) reader() { continue } else { // Transport error. - t.Close() + t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) return } } @@ -1417,7 +1423,7 @@ func (t *http2Client) keepalive() { continue } if outstandingPing && timeoutLeft <= 0 { - t.Close() + t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")) return } t.mu.Lock() diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 5cf7c5f80fe..0c0c573659c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -622,7 +622,7 @@ type ClientTransport interface { // Close tears down this transport. Once it returns, the transport // should not be accessed any more. The caller must make sure this // is called only once. - Close() error + Close(err error) error // GracefulClose starts to tear down the transport: the transport will stop // accepting new RPCs and NewStream will return error. Once all streams are diff --git a/test/end2end_test.go b/test/end2end_test.go index 902e9424104..78add62d0cd 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1333,6 +1333,53 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { awaitNewConnLogOutput() } +func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { + for _, e := range listTestEnv() { + testDetailedConnectionCloseErrorPropagatesToRpcError(t, e) + } +} + +func testDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx) + // first, do a round of ping pong to be certain that the call has been received at the server + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + sreq := &testpb.StreamingOutputCallRequest{ + ResponseParameters: []*testpb.ResponseParameters{ + { + Size: int32(0), + }, + }, + } + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + // stop the server; the RPC should now fail and the error message should include details + // about the specific connection error that was encountered + te.srv.Stop() + possibleConnResetMsg := "connection reset by peer" + possibleEOFMsg := "error reading from server: EOF" + if _, err := stream.Recv(); err == nil || (!strings.Contains(err.Error(), possibleConnResetMsg) && !strings.Contains(err.Error(), possibleEOFMsg)) { + t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v| OR |%v|", stream, err, possibleConnResetMsg, possibleEOFMsg) + } +} + func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { for _, e := range listTestEnv() { if e.name == "handler-tls" { From 0640203b8e7c9ef83b60a22615b123c451a92849 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 1 Apr 2021 00:01:15 -0700 Subject: [PATCH 2/9] fix transport tests --- internal/transport/keepalive_test.go | 32 ++++++++++----------- internal/transport/transport_test.go | 42 ++++++++++++++-------------- test/end2end_test.go | 4 +-- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go index c8f177fecf1..2a72e926270 100644 --- a/internal/transport/keepalive_test.go +++ b/internal/transport/keepalive_test.go @@ -47,7 +47,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -86,7 +86,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -122,7 +122,7 @@ func (s) TestMaxConnectionAge(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -169,7 +169,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -228,7 +228,7 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -257,7 +257,7 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) { PermitWithoutStream: true, }}, connCh) defer cancel() - defer client.Close() + defer client.Close(nil) conn, ok := <-connCh if !ok { @@ -288,7 +288,7 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) { Timeout: 1 * time.Second, }}, connCh) defer cancel() - defer client.Close() + defer client.Close(nil) conn, ok := <-connCh if !ok { @@ -317,7 +317,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { Timeout: 1 * time.Second, }}, connCh) defer cancel() - defer client.Close() + defer client.Close(nil) conn, ok := <-connCh if !ok { @@ -352,7 +352,7 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { PermitWithoutStream: true, }}) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -391,7 +391,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -436,7 +436,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -480,7 +480,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -530,7 +530,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -564,7 +564,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -604,7 +604,7 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() @@ -658,7 +658,7 @@ func (s) TestTCPUserTimeout(t *testing.T) { }, ) defer func() { - client.Close() + client.Close(nil) server.stop() cancel() }() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 1d8d3ed355d..fcb8367b75b 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -481,7 +481,7 @@ func (s) TestInflightStreamClosing(t *testing.T) { server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() - defer client.Close() + defer client.Close(nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -550,7 +550,7 @@ func (s) TestClientSendAndReceive(t *testing.T) { if recvErr != io.EOF { t.Fatalf("Error: %v; want ", recvErr) } - ct.Close() + ct.Close(nil) server.stop() } @@ -560,7 +560,7 @@ func (s) TestClientErrorNotify(t *testing.T) { go server.stop() // ct.reader should detect the error and activate ct.Error(). <-ct.Error() - ct.Close() + ct.Close(nil) } func performOneRPC(ct ClientTransport) { @@ -597,7 +597,7 @@ func (s) TestClientMix(t *testing.T) { }(s) go func(ct ClientTransport) { <-ct.Error() - ct.Close() + ct.Close(nil) }(ct) for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) @@ -636,7 +636,7 @@ func (s) TestLargeMessage(t *testing.T) { }() } wg.Wait() - ct.Close() + ct.Close(nil) server.stop() } @@ -653,7 +653,7 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) { server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co) defer cancel() defer server.stop() - defer ct.Close() + defer ct.Close(nil) server.mu.Lock() ready := server.ready server.mu.Unlock() @@ -831,7 +831,7 @@ func (s) TestLargeMessageSuspension(t *testing.T) { if _, err := s.Read(make([]byte, 8)); err.Error() != expectedErr.Error() { t.Fatalf("Read got %v of type %T, want %v", err, err, expectedErr) } - ct.Close() + ct.Close(nil) server.stop() } @@ -841,7 +841,7 @@ func (s) TestMaxStreams(t *testing.T) { } server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() - defer ct.Close() + defer ct.Close(nil) defer server.stop() callHdr := &CallHdr{ Host: "localhost", @@ -901,7 +901,7 @@ func (s) TestMaxStreams(t *testing.T) { // Close the first stream created so that the new stream can finally be created. ct.CloseStream(s, nil) <-done - ct.Close() + ct.Close(nil) <-ct.writerDone if ct.maxConcurrentStreams != 1 { t.Fatalf("ct.maxConcurrentStreams: %d, want 1", ct.maxConcurrentStreams) @@ -960,7 +960,7 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) { sc.mu.Unlock() break } - ct.Close() + ct.Close(nil) select { case <-ss.Context().Done(): if ss.Context().Err() != context.Canceled { @@ -980,7 +980,7 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) { server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions) defer cancel() defer server.stop() - defer client.Close() + defer client.Close(nil) waitWhileTrue(t, func() (bool, error) { server.mu.Lock() @@ -1069,7 +1069,7 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) { server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() - defer client.Close() + defer client.Close(nil) waitWhileTrue(t, func() (bool, error) { server.mu.Lock() defer server.mu.Unlock() @@ -1302,7 +1302,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) { if err != nil { t.Fatalf("Error while creating client transport: %v", err) } - defer ct.Close() + defer ct.Close(nil) str, err := ct.NewStream(connectCtx, &CallHdr{}) if err != nil { t.Fatalf("Error while creating stream: %v", err) @@ -1345,7 +1345,7 @@ func (s) TestEncodingRequiredStatus(t *testing.T) { if !testutils.StatusErrEqual(s.Status().Err(), encodingTestStatus.Err()) { t.Fatalf("stream with status %v, want %v", s.Status(), encodingTestStatus) } - ct.Close() + ct.Close(nil) server.stop() } @@ -1367,7 +1367,7 @@ func (s) TestInvalidHeaderField(t *testing.T) { if se, ok := status.FromError(err); !ok || se.Code() != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) { t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.Internal, expectedInvalidHeaderField) } - ct.Close() + ct.Close(nil) server.stop() } @@ -1375,7 +1375,7 @@ func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField) defer cancel() defer server.stop() - defer ct.Close() + defer ct.Close(nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() s, err := ct.NewStream(ctx, &CallHdr{Host: "localhost", Method: "foo"}) @@ -1481,7 +1481,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co) defer cancel() defer server.stop() - defer client.Close() + defer client.Close(nil) waitWhileTrue(t, func() (bool, error) { server.mu.Lock() defer server.mu.Unlock() @@ -1563,7 +1563,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) } // Close down both server and client so that their internals can be read without data // races. - client.Close() + client.Close(nil) st.Close() <-st.readerDone <-st.writerDone @@ -1762,7 +1762,7 @@ func runPingPongTest(t *testing.T, msgSize int) { server, client, cancel := setUp(t, 0, 0, pingpong) defer cancel() defer server.stop() - defer client.Close() + defer client.Close(nil) waitWhileTrue(t, func() (bool, error) { server.mu.Lock() defer server.mu.Unlock() @@ -1850,7 +1850,7 @@ func (s) TestHeaderTblSize(t *testing.T) { server, ct, cancel := setUp(t, 0, math.MaxUint32, normal) defer cancel() - defer ct.Close() + defer ct.Close(nil) defer server.stop() ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() @@ -1969,7 +1969,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) { if err != nil { t.Fatalf("NewClientTransport(): %v", err) } - defer tr.Close() + defer tr.Close(nil) wantAttr := attributes.New(testAttrKey, testAttrVal) if gotAttr := creds.attr; !cmp.Equal(gotAttr, wantAttr, cmp.AllowUnexported(attributes.Attributes{})) { diff --git a/test/end2end_test.go b/test/end2end_test.go index 78add62d0cd..f4d6de1ebce 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1335,11 +1335,11 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { for _, e := range listTestEnv() { - testDetailedConnectionCloseErrorPropagatesToRpcError(t, e) + testDetailedConnectionCloseErrorPropagatesToRPCError(t, e) } } -func testDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T, e env) { +func testDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA te.startServer(&testServer{security: e.security}) From 38da1fd9cbf90bade63b235ba13ef63b06754297 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 1 Apr 2021 11:02:02 -0700 Subject: [PATCH 3/9] Make test deterministic --- test/end2end_test.go | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index f4d6de1ebce..dadcd085fab 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1342,6 +1342,13 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { func testDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA + rpcStartedOnServer := make(chan struct{}) + rpcDoneOnClient := make(chan struct{}) + te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + close(rpcStartedOnServer) + <-rpcDoneOnClient + return status.Error(codes.Internal, "arbitrary status") + } te.startServer(&testServer{security: e.security}) defer te.tearDown() @@ -1349,35 +1356,21 @@ func testDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T, e env) { tc := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + possibleConnResetMsg := "connection reset by peer" + possibleEOFMsg := "error reading from server: EOF" + // Get the RPC to make it to the server. Then, while the RPC is still being handled at the server, abruptly + // stop the server, killing the connection. The RPC error message should include details about the specific + // connection error that was encountered. stream, err := tc.FullDuplexCall(ctx) - // first, do a round of ping pong to be certain that the call has been received at the server if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", tc, err) } - sreq := &testpb.StreamingOutputCallRequest{ - ResponseParameters: []*testpb.ResponseParameters{ - { - Size: int32(0), - }, - }, - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err != nil { - t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - // stop the server; the RPC should now fail and the error message should include details - // about the specific connection error that was encountered + <-rpcStartedOnServer te.srv.Stop() - possibleConnResetMsg := "connection reset by peer" - possibleEOFMsg := "error reading from server: EOF" if _, err := stream.Recv(); err == nil || (!strings.Contains(err.Error(), possibleConnResetMsg) && !strings.Contains(err.Error(), possibleEOFMsg)) { t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v| OR |%v|", stream, err, possibleConnResetMsg, possibleEOFMsg) } + close(rpcDoneOnClient) } func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { From b1f5e59c11d683c2debbec9c6c97f5b986746415 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 1 Apr 2021 15:35:03 -0700 Subject: [PATCH 4/9] Include details about possible goaways when terminating RPCs due to connection close. --- internal/transport/http2_client.go | 20 +++++- internal/transport/keepalive_test.go | 10 +-- internal/transport/transport.go | 5 +- test/end2end_test.go | 97 ++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 9 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f535636c99f..9c341110a4b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -19,6 +19,7 @@ package transport import ( + "bytes" "context" "fmt" "io" @@ -116,6 +117,9 @@ type http2Client struct { // goAwayReason records the http2.ErrCode and debug data received with the // GoAway frame. goAwayReason GoAwayReason + // goAwayDebugMessage contains a detailed human readable string about a + // GoAway frame, useful for error messages. + goAwayDebugMessage string // A condition variable used to signal when the keepalive goroutine should // go dormant. The condition for dormancy is based on the number of active // streams and the `PermitWithoutStream` keepalive client parameter. And @@ -874,6 +878,12 @@ func (t *http2Client) Close(err error) error { if channelz.IsOn() { channelz.RemoveEntry(t.channelzID) } + // Append info about previous goaways if there were any, since this may be important + // for understanding the root cause for this connection to be closed. + _, goAwayDebugMessage := t.GetGoAwayReason() + if len(goAwayDebugMessage) > 0 { + err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) + } // Notify all active streams. for _, s := range streams { t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) @@ -1215,12 +1225,18 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { t.goAwayReason = GoAwayTooManyPings } } + var m bytes.Buffer + m.WriteString("code: ") + m.WriteString(f.ErrCode.String()) + m.WriteString(", debug data: ") + m.Write(f.DebugData()) + t.goAwayDebugMessage = m.String() } -func (t *http2Client) GetGoAwayReason() GoAwayReason { +func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) { t.mu.Lock() defer t.mu.Unlock() - return t.goAwayReason + return t.goAwayReason, t.goAwayDebugMessage } func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { diff --git a/internal/transport/keepalive_test.go b/internal/transport/keepalive_test.go index 2a72e926270..c744c8303ba 100644 --- a/internal/transport/keepalive_test.go +++ b/internal/transport/keepalive_test.go @@ -68,7 +68,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) { if !timeout.Stop() { <-timeout.C } - if reason := client.GetGoAwayReason(); reason != GoAwayNoReason { + if reason, _ := client.GetGoAwayReason(); reason != GoAwayNoReason { t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason) } case <-timeout.C: @@ -142,7 +142,7 @@ func (s) TestMaxConnectionAge(t *testing.T) { if !timeout.Stop() { <-timeout.C } - if reason := client.GetGoAwayReason(); reason != GoAwayNoReason { + if reason, _ := client.GetGoAwayReason(); reason != GoAwayNoReason { t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason) } case <-timeout.C: @@ -402,7 +402,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) { if !timeout.Stop() { <-timeout.C } - if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings { + if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings { t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings) } case <-timeout.C: @@ -447,7 +447,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { if !timeout.Stop() { <-timeout.C } - if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings { + if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings { t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings) } case <-timeout.C: @@ -497,7 +497,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { if !timeout.Stop() { <-timeout.C } - if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings { + if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings { t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings) } case <-timeout.C: diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 0c0c573659c..ef60a979162 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -656,8 +656,9 @@ type ClientTransport interface { // HTTP/2). GoAway() <-chan struct{} - // GetGoAwayReason returns the reason why GoAway frame was received. - GetGoAwayReason() GoAwayReason + // GetGoAwayReason returns the reason why GoAway frame was received, along + // with a human readable string with debug info. + GetGoAwayReason() (GoAwayReason, string) // RemoteAddr returns the remote network address. RemoteAddr() net.Addr diff --git a/test/end2end_test.go b/test/end2end_test.go index dadcd085fab..78798ac4a78 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -55,12 +55,14 @@ import ( "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -1373,6 +1375,101 @@ func testDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T, e env) { close(rpcDoneOnClient) } +func (s) TestDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) { + for _, e := range listTestEnv() { + testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t, e) + } +} + +func testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.customServerOptions = []grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionAge: time.Millisecond * 100, + MaxConnectionAgeGrace: time.Millisecond, + }), + } + rpcDoneOnClient := make(chan struct{}) + te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + <-rpcDoneOnClient + return status.Error(codes.Internal, "arbitrary status") + } + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", tc, err) + } + expectedErrorMessageSubstring := "received prior goaway: code: NO_ERROR" + _, err = stream.Recv() + close(rpcDoneOnClient) + if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) { + t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring) + } +} + +func (s) TestDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) { + // first, lower the min keepalive time so that we can generate + // server-side ping abuse logic quickly + prev := internal.KeepaliveMinPingTime + internal.KeepaliveMinPingTime = time.Millisecond + defer func() { internal.KeepaliveMinPingTime = prev }() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + // TODO(apolcyn): the server doesn't terminate the connection due + // to aggressive keepalives under the handler-tls configuration, is + // this WAI? + continue + } + testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t, e) + } +} + +func testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.customDialOptions = []grpc.DialOption{ + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Millisecond, /* should trigger "too many pings" error quickly */ + Timeout: time.Second * 1000, /* arbitrarym, arge value */ + PermitWithoutStream: false, + }), + } + te.customServerOptions = []grpc.ServerOption{ + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: time.Second * 1000, /* arbitrary, large value */ + }), + } + rpcDoneOnClient := make(chan struct{}) + te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + <-rpcDoneOnClient + return status.Error(codes.Internal, "arbitrary status") + } + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", tc, err) + } + expectedErrorMessageSubstring := "received prior goaway: code: ENHANCE_YOUR_CALM, debug data: too_many_pings" + _, err = stream.Recv() + close(rpcDoneOnClient) + if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) { + t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring) + } +} + func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { for _, e := range listTestEnv() { if e.name == "handler-tls" { From 7dee6ae90faff68cbd41e344a5a9df5ce980a571 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 1 Apr 2021 20:57:45 -0700 Subject: [PATCH 5/9] skip handler-tls config for max age goaway error test --- test/end2end_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index 78798ac4a78..d4fb9fe7187 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1377,6 +1377,12 @@ func testDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T, e env) { func (s) TestDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) { for _, e := range listTestEnv() { + if e.name == "handler-tls" { + // TODO(apolcyn): the server doesn't terminate the connection due + // to max connection age under the handler-tls configuration, is + // this WAI? + continue + } testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t, e) } } From eef6755fda99139c2292da02152795e54597c5ff Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 14 Apr 2021 10:29:14 -0700 Subject: [PATCH 6/9] make test use stubserver --- test/end2end_test.go | 93 +++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 58 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 0f93699ce69..97a96fe987f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1383,99 +1383,76 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { } func (s) TestDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) { - for _, e := range listTestEnv() { - if e.name == "handler-tls" { - // TODO(apolcyn): the server doesn't terminate the connection due - // to max connection age under the handler-tls configuration, is - // this WAI? - continue - } - testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t, e) + rpcDoneOnClient := make(chan struct{}) + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { + <-rpcDoneOnClient + return status.Error(codes.Internal, "arbitrary status") + }, } -} - -func testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T, e env) { - te := newTest(t, e) - te.userAgent = testAppUA - te.customServerOptions = []grpc.ServerOption{ + sopts := []grpc.ServerOption{ grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionAge: time.Millisecond * 100, MaxConnectionAgeGrace: time.Millisecond, }), } - rpcDoneOnClient := make(chan struct{}) - te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - <-rpcDoneOnClient - return status.Error(codes.Internal, "arbitrary status") + if err := ss.Start(sopts); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) } - te.startServer(&testServer{security: e.security}) - defer te.tearDown() + defer ss.Stop() - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := tc.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { - t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", tc, err) + t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", ss.Client, err) } - expectedErrorMessageSubstring := "received prior goaway: code: NO_ERROR" + const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR" _, err = stream.Recv() close(rpcDoneOnClient) if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) { - t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring) + t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring) } } func (s) TestDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) { - // first, lower the min keepalive time so that we can generate - // server-side ping abuse logic quickly + // set the min keepalive time very low so that this test can take + // a reasonable amount of time prev := internal.KeepaliveMinPingTime internal.KeepaliveMinPingTime = time.Millisecond defer func() { internal.KeepaliveMinPingTime = prev }() - for _, e := range listTestEnv() { - if e.name == "handler-tls" { - // TODO(apolcyn): the server doesn't terminate the connection due - // to aggressive keepalives under the handler-tls configuration, is - // this WAI? - continue - } - testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t, e) - } -} -func testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T, e env) { - te := newTest(t, e) - te.userAgent = testAppUA - te.customDialOptions = []grpc.DialOption{ + rpcDoneOnClient := make(chan struct{}) + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { + <-rpcDoneOnClient + return status.Error(codes.Internal, "arbitrary status") + }, + } + sopts := []grpc.ServerOption{ + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: time.Second * 1000, /* arbitrary, large value */ + }), + } + dopts := []grpc.DialOption{ grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Millisecond, /* should trigger "too many pings" error quickly */ Timeout: time.Second * 1000, /* arbitrarym, arge value */ PermitWithoutStream: false, }), } - te.customServerOptions = []grpc.ServerOption{ - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: time.Second * 1000, /* arbitrary, large value */ - }), - } - rpcDoneOnClient := make(chan struct{}) - te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - <-rpcDoneOnClient - return status.Error(codes.Internal, "arbitrary status") + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) } - te.startServer(&testServer{security: e.security}) - defer te.tearDown() + defer ss.Stop() - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := tc.FullDuplexCall(ctx) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { - t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", tc, err) + t.Fatalf("%v.FullDuplexCall = _, %v, want _, ", ss.Client, err) } - expectedErrorMessageSubstring := "received prior goaway: code: ENHANCE_YOUR_CALM, debug data: too_many_pings" + const expectedErrorMessageSubstring = "received prior goaway: code: ENHANCE_YOUR_CALM, debug data: too_many_pings" _, err = stream.Recv() close(rpcDoneOnClient) if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) { From afed69fc432179a434a44bd7158b36427bf7eec6 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 14 Apr 2021 10:33:41 -0700 Subject: [PATCH 7/9] address review comment --- internal/transport/http2_client.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 515b1848b70..2d118542191 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -19,7 +19,6 @@ package transport import ( - "bytes" "context" "fmt" "io" @@ -1223,12 +1222,7 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { t.goAwayReason = GoAwayTooManyPings } } - var m bytes.Buffer - m.WriteString("code: ") - m.WriteString(f.ErrCode.String()) - m.WriteString(", debug data: ") - m.Write(f.DebugData()) - t.goAwayDebugMessage = m.String() + t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, string(f.DebugData())) } func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) { From 9277ca559d0e89475d8d0faf5b9ad7d265d8e943 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 14 Apr 2021 10:35:42 -0700 Subject: [PATCH 8/9] address comment from https://github.com/grpc/grpc-go/pull/4311#discussion_r612598779 --- internal/transport/http2_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 2d118542191..48c5e52edae 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1155,8 +1155,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { } } id := f.LastStreamID - // TODO(apolcyn): address this review comment - if id > 0 && id%2 != 1 { + if id > 0 && id%2 == 0 { t.mu.Unlock() t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id)) return From 5270ce5aa414691fc19244ac17a849f1b415f831 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 14 Apr 2021 10:38:31 -0700 Subject: [PATCH 9/9] fix typo --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 97a96fe987f..98cd4e594c0 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1437,7 +1437,7 @@ func (s) TestDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) dopts := []grpc.DialOption{ grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Millisecond, /* should trigger "too many pings" error quickly */ - Timeout: time.Second * 1000, /* arbitrarym, arge value */ + Timeout: time.Second * 1000, /* arbitrary, large value */ PermitWithoutStream: false, }), }