From cfba279cbd03b4f1c69d0c582e7b6c418ac8a049 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 2 Dec 2022 22:20:56 -0500 Subject: [PATCH 1/8] Added logs throughout the transport layer for connection and transport close --- internal/transport/controlbuf.go | 9 ++++++ internal/transport/http2_client.go | 47 ++++++++++++++++++++++++++---- internal/transport/http2_server.go | 18 ++++++++++-- 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 0e221af728b..b39722eba43 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -416,6 +416,9 @@ func (c *controlBuffer) get(block bool) (interface{}, error) { select { case <-c.ch: case <-c.done: + if logger.V(logLevel) { + logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because transports context expired.") + } return nil, ErrConnClosing } } @@ -772,6 +775,9 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { } } if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { + if logger.V(logLevel) { + logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams left to finish processing while in draining mode.") + } return ErrConnClosing } return nil @@ -807,6 +813,9 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { if l.side == clientSide { l.draining = true if len(l.estdStreams) == 0 { + if logger.V(logLevel) { + logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams to wait to finish processing when GOAWAY frame received.") + } return ErrConnClosing } } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 23d6ec6bc49..d9b341d179b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -223,6 +223,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts // Any further errors will close the underlying connection defer func(conn net.Conn) { if err != nil { + if logger.V(logLevel) { + logger.Errorf("transport: closing connection due to error %v.", err) + } conn.Close() } }(conn) @@ -244,6 +247,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts <-newClientCtx.Done() // Block until connectCtx expires or the defer above executes. if connectCtx.Err() != nil { // connectCtx expired before exiting the function. Hard close the connection. + if logger.V(logLevel) { + logger.Infof("transport: closing connection due to connect context expiring.") + } conn.Close() } }(conn) @@ -396,6 +402,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts err = <-readerErrCh } if err != nil { + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to error reading server preface: %v.", err) + } t.Close(err) } }() @@ -447,7 +456,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts err := t.loopy.run() if err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter.run returning. Err: %v", err) + logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing connection.", err) } } // Do not close the transport. Let reader goroutine handle it since @@ -965,6 +974,9 @@ func (t *http2Client) Close(err error) { t.mu.Unlock() t.controlBuf.finish() t.cancel() + if logger.V(logLevel) { + logger.Infof("transport: calling Close on the conn within transport.Close.") + } t.conn.Close() channelz.RemoveEntry(t.channelzID) // Append info about previous goaways if there were any, since this may be important @@ -1007,6 +1019,9 @@ func (t *http2Client) GracefulClose() { active := len(t.activeStreams) t.mu.Unlock() if active == 0 { + if logger.V(logLevel) { + logger.Infof("transport: closing transport because no active streams left to process in GracefulClose call.") + } t.Close(ErrConnClosing) return } @@ -1255,7 +1270,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { id := f.LastStreamID if id > 0 && id%2 == 0 { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id)) + err := connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id) + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) + } + t.Close(err) return } // A client can receive multiple GoAways from the server (see @@ -1273,7 +1292,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { // If there are multiple GoAways the first one should always have an ID greater than the following ones. if id > t.prevGoAwayID { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)) + err := connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID) + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) + } + t.Close(err) return } default: @@ -1296,7 +1319,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { t.prevGoAwayID = id if len(t.activeStreams) == 0 { t.mu.Unlock() - t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) + err := connectionErrorf(true, nil, "received goaway and there are no active streams") + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) + } + t.Close(err) return } @@ -1602,7 +1629,11 @@ func (t *http2Client) reader(errCh chan<- error) { continue } else { // Transport error. - t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) + err := connectionErrorf(true, err, "error reading from server: %v", err) + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to connection level error %v.", err) + } + t.Close(err) return } } @@ -1661,7 +1692,11 @@ func (t *http2Client) keepalive() { continue } if outstandingPing && timeoutLeft <= 0 { - t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")) + err := connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout") + if logger.V(logLevel) { + logger.Errorf("transport: closing transport due to error in keep alive handling: %v.", err) + } + t.Close(err) return } t.mu.Lock() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6c8aa403539..61f0c7f1c6a 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -333,7 +333,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler if err := t.loopy.run(); err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter.run returning. Err: %v", err) + logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing Connection.", err) } } t.conn.Close() @@ -630,11 +630,14 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context. continue } if err == io.EOF || err == io.ErrUnexpectedEOF { + if logger.V(logLevel) { + logger.Infof("transport: closing the server transport due to connection level EOF.") + } t.Close() return } if logger.V(logLevel) { - logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) + logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v, closing transport.", err) } t.Close() return @@ -642,6 +645,9 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context. switch frame := frame.(type) { case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { + if logger.V(logLevel) { + logger.Errorf("transport: closing the transport due to fatal error processing headers frame.") + } t.Close() break } @@ -1170,7 +1176,7 @@ func (t *http2Server) keepalive() { } if outstandingPing && kpTimeoutLeft <= 0 { if logger.V(logLevel) { - logger.Infof("transport: closing server transport due to idleness.") + logger.Infof("transport: closing server transport due to keep alive ping not being acked within timeout %v.", t.kp.Time.String()) } t.Close() return @@ -1211,6 +1217,9 @@ func (t *http2Server) Close() { t.mu.Unlock() t.controlBuf.finish() close(t.done) + if logger.V(logLevel) { + logger.Infof("transport: calling Close on the conn within transport.Close.") + } if err := t.conn.Close(); err != nil && logger.V(logLevel) { logger.Infof("transport: error closing conn during Close: %v", err) } @@ -1320,6 +1329,9 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { t.state = draining sid := t.maxStreamID if len(t.activeStreams) == 0 { + if logger.V(logLevel) { + logger.Infof("transport: second goaway written and no active streams left to process, closing the connection.") + } g.closeConn = true } t.mu.Unlock() From 2b055c6ec3c28c95f9cce7571f064c7189dc57ac Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 6 Dec 2022 00:50:44 -0500 Subject: [PATCH 2/8] Responded to Doug's comments --- clientconn.go | 3 +++ internal/transport/controlbuf.go | 4 ++-- internal/transport/http2_client.go | 32 +++++++++--------------------- internal/transport/http2_server.go | 17 ++++++---------- internal/transport/transport.go | 4 ++++ 5 files changed, 24 insertions(+), 36 deletions(-) diff --git a/clientconn.go b/clientconn.go index 78c81a108ed..5324a89f5e8 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1274,6 +1274,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose) if err != nil { + if logger.V(2) { + logger.Errorf("NewClientTransport failed to connect to %s. Err: %v", addr, err) + } // newTr is either nil, or closed. hcancel() channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index b39722eba43..03e8e83174a 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -191,7 +191,7 @@ type goAway struct { code http2.ErrCode debugData []byte headsUp bool - closeConn bool + closeConn error } func (*goAway) isTransportResponseFrame() bool { return false } @@ -417,7 +417,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) { case <-c.ch: case <-c.done: if logger.V(logLevel) { - logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because transports context expired.") + logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because transport's context expired.") } return nil, ErrConnClosing } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index d9b341d179b..26e5a97b65c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -223,9 +223,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts // Any further errors will close the underlying connection defer func(conn net.Conn) { if err != nil { - if logger.V(logLevel) { - logger.Errorf("transport: closing connection due to error %v.", err) - } conn.Close() } }(conn) @@ -960,6 +957,9 @@ func (t *http2Client) Close(err error) { t.mu.Unlock() return } + if logger.V(logLevel) { + logger.Infof("closing transport due to err: %v, will close the connection.", err) + } // Call t.onClose ASAP to prevent the client from attempting to create new // streams. t.onClose() @@ -974,9 +974,6 @@ func (t *http2Client) Close(err error) { t.mu.Unlock() t.controlBuf.finish() t.cancel() - if logger.V(logLevel) { - logger.Infof("transport: calling Close on the conn within transport.Close.") - } t.conn.Close() channelz.RemoveEntry(t.channelzID) // Append info about previous goaways if there were any, since this may be important @@ -1015,14 +1012,14 @@ func (t *http2Client) GracefulClose() { t.mu.Unlock() return } + if logger.V(logLevel) { + logger.Infof("transport: GracefulClose called, transport switched to draining") + } t.state = draining active := len(t.activeStreams) t.mu.Unlock() if active == 0 { - if logger.V(logLevel) { - logger.Infof("transport: closing transport because no active streams left to process in GracefulClose call.") - } - t.Close(ErrConnClosing) + t.Close(errNoStreamsDraining) return } t.controlBuf.put(&incomingGoAway{}) @@ -1270,10 +1267,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { id := f.LastStreamID if id > 0 && id%2 == 0 { t.mu.Unlock() + err := connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id) - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) - } t.Close(err) return } @@ -1293,9 +1288,6 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { if id > t.prevGoAwayID { t.mu.Unlock() err := connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID) - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) - } t.Close(err) return } @@ -1320,9 +1312,6 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { if len(t.activeStreams) == 0 { t.mu.Unlock() err := connectionErrorf(true, nil, "received goaway and there are no active streams") - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error in goaway handling: %v.", err) - } t.Close(err) return } @@ -1630,9 +1619,6 @@ func (t *http2Client) reader(errCh chan<- error) { } else { // Transport error. err := connectionErrorf(true, err, "error reading from server: %v", err) - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to connection level error %v.", err) - } t.Close(err) return } @@ -1694,7 +1680,7 @@ func (t *http2Client) keepalive() { if outstandingPing && timeoutLeft <= 0 { err := connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout") if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error in keep alive handling: %v.", err) + logger.Errorf("transport: closing transport due to error in keepalive handling: %v.", err) } t.Close(err) return diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 61f0c7f1c6a..e358cb7aa4d 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -892,10 +892,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - if logger.V(logLevel) { - logger.Errorf("transport: Got too many pings from the client, closing the connection.") - } - t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) + t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: fmt.Errorf("got too many pings from the client")}) } } @@ -1176,7 +1173,7 @@ func (t *http2Server) keepalive() { } if outstandingPing && kpTimeoutLeft <= 0 { if logger.V(logLevel) { - logger.Infof("transport: closing server transport due to keep alive ping not being acked within timeout %v.", t.kp.Time.String()) + logger.Infof("transport: closing server transport due to keepalive ping not being acked within timeout %v.", t.kp.Time.String()) } t.Close() return @@ -1328,22 +1325,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { // Stop accepting more streams now. t.state = draining sid := t.maxStreamID + retErr := g.closeConn if len(t.activeStreams) == 0 { - if logger.V(logLevel) { - logger.Infof("transport: second goaway written and no active streams left to process, closing the connection.") - } - g.closeConn = true + retErr = fmt.Errorf("second goaway written and no active streams left to process") } t.mu.Unlock() t.maxStreamMu.Unlock() if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { return false, err } - if g.closeConn { + if retErr != nil { // Abruptly close the connection following the GoAway (via // loopywriter). But flush out what's inside the buffer first. t.framer.writer.Flush() - return false, fmt.Errorf("transport: Connection closing") + return false, retErr } return true, nil } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 2e615ee20cc..ee0a8699d98 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -761,6 +761,10 @@ func (e ConnectionError) Unwrap() error { var ( // ErrConnClosing indicates that the transport is closing. ErrConnClosing = connectionErrorf(true, nil, "transport is closing") + // errNoStreamsDraining indicates that this transport is finished processing + // while in draining mode due to no more active streams to process, thus + // making the transport and connection immediately ready to close. + errNoStreamsDraining = connectionErrorf(true, nil, "no active streams left to process while draining") // errStreamDrain indicates that the stream is rejected because the // connection is draining. This could be caused by goaway or balancer // removing the address. From c8bde092c89b7c45c995503f972eb15cb325620d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 6 Dec 2022 01:15:36 -0500 Subject: [PATCH 3/8] Added error to ServerTransport.Stop --- internal/transport/handler_server.go | 9 +++-- internal/transport/http2_server.go | 55 +++++++++++----------------- internal/transport/transport.go | 2 +- internal/transport/transport_test.go | 6 +-- server.go | 6 +-- 5 files changed, 35 insertions(+), 43 deletions(-) diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index fb272235d81..d730440c9ed 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -141,7 +141,10 @@ type serverHandlerTransport struct { stats []stats.Handler } -func (ht *serverHandlerTransport) Close() { +func (ht *serverHandlerTransport) Close(err error) { + if logger.V(logLevel) { + logger.Infof("closing serverHandlerTransport due to err: %v", err) + } ht.closeOnce.Do(ht.closeCloseChanOnce) } @@ -236,7 +239,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro }) } } - ht.Close() + ht.Close(fmt.Errorf("finished writing status")) return err } @@ -346,7 +349,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace case <-ht.req.Context().Done(): } cancel() - ht.Close() + ht.Close(fmt.Errorf("request is done processing")) }() req := ht.req diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index e358cb7aa4d..6b3f227d5af 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -293,7 +293,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, defer func() { if err != nil { - t.Close() + t.Close(err) } }() @@ -344,8 +344,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, return t, nil } -// operateHeader takes action on the decoded headers. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { +// operateHeader takes action on the decoded headers. Returns an error if fatal +// error encountered and transport needs to close, otherwise returns nil. +func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error { // Acquire max stream ID lock for entire duration t.maxStreamMu.Lock() defer t.maxStreamMu.Unlock() @@ -361,15 +362,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( rstCode: http2.ErrCodeFrameSize, onWrite: func() {}, }) - return false + return nil } if streamID%2 != 1 || streamID <= t.maxStreamID { // illegal gRPC stream id. - if logger.V(logLevel) { - logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) - } - return true + return fmt.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) } t.maxStreamID = streamID @@ -453,7 +451,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( status: status.New(codes.Internal, errMsg), rst: !frame.StreamEnded(), }) - return false + return nil } if !isGRPC || headerError { @@ -463,7 +461,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( rstCode: http2.ErrCodeProtocol, onWrite: func() {}, }) - return false + return nil } // "If :authority is missing, Host must be renamed to :authority." - A41 @@ -503,7 +501,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( if t.state != reachable { t.mu.Unlock() s.cancel() - return false + return nil } if uint32(len(t.activeStreams)) >= t.maxStreams { t.mu.Unlock() @@ -514,7 +512,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( onWrite: func() {}, }) s.cancel() - return false + return nil } if httpMethod != http.MethodPost { t.mu.Unlock() @@ -530,7 +528,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( rst: !frame.StreamEnded(), }) s.cancel() - return false + return nil } if t.inTapHandle != nil { var err error @@ -550,7 +548,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( status: stat, rst: !frame.StreamEnded(), }) - return false + return nil } } t.activeStreams[streamID] = s @@ -597,7 +595,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( wq: s.wq, }) handle(s) - return false + return nil } // HandleStreams receives incoming streams using the given handler. This is @@ -630,25 +628,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context. continue } if err == io.EOF || err == io.ErrUnexpectedEOF { - if logger.V(logLevel) { - logger.Infof("transport: closing the server transport due to connection level EOF.") - } - t.Close() + t.Close(err) return } - if logger.V(logLevel) { - logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v, closing transport.", err) - } - t.Close() + t.Close(err) return } switch frame := frame.(type) { case *http2.MetaHeadersFrame: - if t.operateHeaders(frame, handle, traceCtx) { - if logger.V(logLevel) { - logger.Errorf("transport: closing the transport due to fatal error processing headers frame.") - } - t.Close() + if err := t.operateHeaders(frame, handle, traceCtx); err != nil { + t.Close(err) break } case *http2.DataFrame: @@ -1175,7 +1164,7 @@ func (t *http2Server) keepalive() { if logger.V(logLevel) { logger.Infof("transport: closing server transport due to keepalive ping not being acked within timeout %v.", t.kp.Time.String()) } - t.Close() + t.Close(fmt.Errorf("keep alive ping not acked within timeout %v", t.kp.Time.String())) return } if !outstandingPing { @@ -1202,21 +1191,21 @@ func (t *http2Server) keepalive() { // Close starts shutting down the http2Server transport. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This // could cause some resource issue. Revisit this later. -func (t *http2Server) Close() { +func (t *http2Server) Close(err error) { t.mu.Lock() if t.state == closing { t.mu.Unlock() return } + if logger.V(logLevel) { + logger.Infof("closing transport due to err: %v, will close the connection.", err) + } t.state = closing streams := t.activeStreams t.activeStreams = nil t.mu.Unlock() t.controlBuf.finish() close(t.done) - if logger.V(logLevel) { - logger.Infof("transport: calling Close on the conn within transport.Close.") - } if err := t.conn.Close(); err != nil && logger.V(logLevel) { logger.Infof("transport: error closing conn during Close: %v", err) } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index ee0a8699d98..3a34a3e9085 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -701,7 +701,7 @@ type ServerTransport interface { // Close tears down the transport. Once it is called, the transport // should not be accessed any more. All the pending streams and their // handlers will be terminated asynchronously. - Close() + Close(err error) // RemoteAddr returns the remote network address. RemoteAddr() net.Addr diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 6cd2201b591..387d2cd642f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -343,7 +343,7 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT s.mu.Lock() if s.conns == nil { s.mu.Unlock() - transport.Close() + transport.Close(fmt.Errorf("s.conns is nil")) return } s.conns[transport] = true @@ -421,7 +421,7 @@ func (s *server) stop() { s.lis.Close() s.mu.Lock() for c := range s.conns { - c.Close() + c.Close(fmt.Errorf("server Stop called")) } s.conns = nil s.mu.Unlock() @@ -1650,7 +1650,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) // Close down both server and client so that their internals can be read without data // races. client.Close(fmt.Errorf("closed manually by test")) - st.Close() + st.Close(fmt.Errorf("closed manually by test")) <-st.readerDone <-st.writerDone <-client.readerDone diff --git a/server.go b/server.go index 8c8a1915008..bacb4c6b59d 100644 --- a/server.go +++ b/server.go @@ -942,7 +942,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { } func (s *Server) serveStreams(st transport.ServerTransport) { - defer st.Close() + defer st.Close(fmt.Errorf("finished serving streams for the server transport")) var wg sync.WaitGroup var roundRobinCounter uint32 @@ -1046,7 +1046,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { - st.Close() + st.Close(fmt.Errorf("s.conns (representing active server transports) is nil")) return false } if s.drain { @@ -1809,7 +1809,7 @@ func (s *Server) Stop() { } for _, cs := range conns { for st := range cs { - st.Close() + st.Close(fmt.Errorf("server Stop called")) } } if s.opts.numServerWorkers > 0 { From 06978d011bb4a6634342e7bb35e406f4dbc660e4 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 7 Dec 2022 12:14:36 -0500 Subject: [PATCH 4/8] Responded to Easwar's comments --- clientconn.go | 2 +- internal/transport/controlbuf.go | 8 +++----- internal/transport/handler_server.go | 6 +++--- internal/transport/http2_client.go | 26 +++++++------------------- internal/transport/http2_server.go | 14 ++++++-------- internal/transport/transport_test.go | 8 ++++---- server.go | 6 +++--- 7 files changed, 27 insertions(+), 43 deletions(-) diff --git a/clientconn.go b/clientconn.go index 5324a89f5e8..c995a74b583 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1275,7 +1275,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose) if err != nil { if logger.V(2) { - logger.Errorf("NewClientTransport failed to connect to %s. Err: %v", addr, err) + logger.Errorf("Creating new client transport to %q: %v", addr, err) } // newTr is either nil, or closed. hcancel() diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 03e8e83174a..f42970c2af6 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -585,7 +585,6 @@ func (l *loopyWriter) run() (err error) { } l.framer.writer.Flush() break hasdata - } } } @@ -673,11 +672,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error { func (l *loopyWriter) originateStream(str *outStream) error { hdr := str.itl.dequeue().(*headerFrame) if err := hdr.initStream(str.id); err != nil { - if err == ErrConnClosing { - return err + if err == errStreamDrain { // errStreamDrain need not close transport + return nil } - // Other errors(errStreamDrain) need not close transport. - return nil + return err } if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { return err diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index d730440c9ed..79177d11fc0 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -143,7 +143,7 @@ type serverHandlerTransport struct { func (ht *serverHandlerTransport) Close(err error) { if logger.V(logLevel) { - logger.Infof("closing serverHandlerTransport due to err: %v", err) + logger.Infof("Closing serverHandlerTransport: %v", err) } ht.closeOnce.Do(ht.closeCloseChanOnce) } @@ -239,7 +239,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro }) } } - ht.Close(fmt.Errorf("finished writing status")) + ht.Close(errors.New("finished writing status")) return err } @@ -349,7 +349,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace case <-ht.req.Context().Done(): } cancel() - ht.Close(fmt.Errorf("request is done processing")) + ht.Close(errors.New("request is done processing")) }() req := ht.req diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 26e5a97b65c..ce015254297 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -399,9 +399,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts err = <-readerErrCh } if err != nil { - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error reading server preface: %v.", err) - } t.Close(err) } }() @@ -453,7 +450,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts err := t.loopy.run() if err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing connection.", err) + logger.Errorf("transport: loopyWriter exited. Closing connection. Err: %v", err) } } // Do not close the transport. Let reader goroutine handle it since @@ -958,7 +955,7 @@ func (t *http2Client) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("closing transport due to err: %v, will close the connection.", err) + logger.Infof("Closing transport, will close the connection. Err: %v", err) } // Call t.onClose ASAP to prevent the client from attempting to create new // streams. @@ -1267,9 +1264,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { id := f.LastStreamID if id > 0 && id%2 == 0 { t.mu.Unlock() - - err := connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id) - t.Close(err) + t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id)) return } // A client can receive multiple GoAways from the server (see @@ -1287,8 +1282,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { // If there are multiple GoAways the first one should always have an ID greater than the following ones. if id > t.prevGoAwayID { t.mu.Unlock() - err := connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID) - t.Close(err) + t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)) return } default: @@ -1311,8 +1305,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { t.prevGoAwayID = id if len(t.activeStreams) == 0 { t.mu.Unlock() - err := connectionErrorf(true, nil, "received goaway and there are no active streams") - t.Close(err) + t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) return } @@ -1618,8 +1611,7 @@ func (t *http2Client) reader(errCh chan<- error) { continue } else { // Transport error. - err := connectionErrorf(true, err, "error reading from server: %v", err) - t.Close(err) + t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) return } } @@ -1678,11 +1670,7 @@ func (t *http2Client) keepalive() { continue } if outstandingPing && timeoutLeft <= 0 { - err := connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout") - if logger.V(logLevel) { - logger.Errorf("transport: closing transport due to error in keepalive handling: %v.", err) - } - t.Close(err) + t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")) return } t.mu.Lock() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 6b3f227d5af..55e199f2808 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -21,6 +21,7 @@ package transport import ( "bytes" "context" + "errors" "fmt" "io" "math" @@ -333,7 +334,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler if err := t.loopy.run(); err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter.run returned. Err: %v. Closing Connection.", err) + logger.Errorf("transport: loopyWriter exited. Closing connection. Err: %v", err) } } t.conn.Close() @@ -881,7 +882,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: fmt.Errorf("got too many pings from the client")}) + t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")}) } } @@ -1161,10 +1162,7 @@ func (t *http2Server) keepalive() { continue } if outstandingPing && kpTimeoutLeft <= 0 { - if logger.V(logLevel) { - logger.Infof("transport: closing server transport due to keepalive ping not being acked within timeout %v.", t.kp.Time.String()) - } - t.Close(fmt.Errorf("keep alive ping not acked within timeout %v", t.kp.Time.String())) + t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time)) return } if !outstandingPing { @@ -1198,7 +1196,7 @@ func (t *http2Server) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("closing transport due to err: %v, will close the connection.", err) + logger.Infof("Closing transport, will close the connection. Err: %v", err) } t.state = closing streams := t.activeStreams @@ -1316,7 +1314,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { sid := t.maxStreamID retErr := g.closeConn if len(t.activeStreams) == 0 { - retErr = fmt.Errorf("second goaway written and no active streams left to process") + retErr = errors.New("second goaway written and no active streams left to process") } t.mu.Unlock() t.maxStreamMu.Unlock() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 387d2cd642f..517cd4015da 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -343,7 +343,7 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT s.mu.Lock() if s.conns == nil { s.mu.Unlock() - transport.Close(fmt.Errorf("s.conns is nil")) + transport.Close(errors.New("s.conns is nil")) return } s.conns[transport] = true @@ -421,7 +421,7 @@ func (s *server) stop() { s.lis.Close() s.mu.Lock() for c := range s.conns { - c.Close(fmt.Errorf("server Stop called")) + c.Close(errors.New("server Stop called")) } s.conns = nil s.mu.Unlock() @@ -1649,8 +1649,8 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) } // Close down both server and client so that their internals can be read without data // races. - client.Close(fmt.Errorf("closed manually by test")) - st.Close(fmt.Errorf("closed manually by test")) + client.Close(errors.New("closed manually by test")) + st.Close(errors.New("closed manually by test")) <-st.readerDone <-st.writerDone <-client.readerDone diff --git a/server.go b/server.go index bacb4c6b59d..fa8b90a9f4b 100644 --- a/server.go +++ b/server.go @@ -942,7 +942,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { } func (s *Server) serveStreams(st transport.ServerTransport) { - defer st.Close(fmt.Errorf("finished serving streams for the server transport")) + defer st.Close(errors.New("finished serving streams for the server transport")) var wg sync.WaitGroup var roundRobinCounter uint32 @@ -1046,7 +1046,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { - st.Close(fmt.Errorf("s.conns (representing active server transports) is nil")) + st.Close(errors.New("s.conns (representing active server transports) is nil")) return false } if s.drain { @@ -1809,7 +1809,7 @@ func (s *Server) Stop() { } for _, cs := range conns { for st := range cs { - st.Close(fmt.Errorf("server Stop called")) + st.Close(errors.New("server Stop called")) } } if s.opts.numServerWorkers > 0 { From 4633d062124049999dc5ddc728f3f33dd6cbe413 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 7 Dec 2022 13:56:45 -0500 Subject: [PATCH 5/8] Switched all errors returned from loopyWriter.run() to info logs, and switched some errors from ErrConnClosing to errors with information inside --- internal/transport/controlbuf.go | 27 +++------------------------ internal/transport/http2_client.go | 2 +- internal/transport/http2_server.go | 2 +- 3 files changed, 5 insertions(+), 26 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index f42970c2af6..62268292bfd 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -416,10 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) { select { case <-c.ch: case <-c.done: - if logger.V(logLevel) { - logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because transport's context expired.") - } - return nil, ErrConnClosing + return nil, errors.New("transport's context expired") } } } @@ -530,18 +527,6 @@ const minBatchSize = 1000 // As an optimization, to increase the batch size for each flush, loopy yields the processor, once // if the batch size is too low to give stream goroutines a chance to fill it up. func (l *loopyWriter) run() (err error) { - defer func() { - if err == ErrConnClosing { - // Don't log ErrConnClosing as error since it happens - // 1. When the connection is closed by some other known issue. - // 2. User closed the connection. - // 3. A graceful close of connection. - if logger.V(logLevel) { - logger.Infof("transport: loopyWriter.run returning. %v", err) - } - err = nil - } - }() for { it, err := l.cbuf.get(true) if err != nil { @@ -773,10 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { } } if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { - if logger.V(logLevel) { - logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams left to finish processing while in draining mode.") - } - return ErrConnClosing + return errors.New("finished processing active streams while in draining mode, no more streams to finish processing") } return nil } @@ -811,10 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { if l.side == clientSide { l.draining = true if len(l.estdStreams) == 0 { - if logger.V(logLevel) { - logger.Infof("transport: trigger loopy to exit (with a subsequent conn closure) because no active streams to wait to finish processing when GOAWAY frame received.") - } - return ErrConnClosing + return errors.New("no active streams left to process when GOAWAY frame received, so can trigger closure") } } return nil diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ce015254297..a2da49a77d7 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -450,7 +450,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts err := t.loopy.run() if err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter exited. Closing connection. Err: %v", err) + logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) } } // Do not close the transport. Let reader goroutine handle it since diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 55e199f2808..1a2acb9ce0b 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -334,7 +334,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler if err := t.loopy.run(); err != nil { if logger.V(logLevel) { - logger.Errorf("transport: loopyWriter exited. Closing connection. Err: %v", err) + logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) } } t.conn.Close() From 30d51f7ba6c8eeae2add360f1a277a46208944ff Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 8 Dec 2022 17:20:07 -0500 Subject: [PATCH 6/8] Responded to Doug and Easwar's comments --- clientconn.go | 2 +- internal/transport/controlbuf.go | 4 ++-- internal/transport/handler_server.go | 10 ++++++---- internal/transport/http2_client.go | 12 +++++------- internal/transport/http2_server.go | 15 +++++++-------- internal/transport/transport.go | 4 ---- server.go | 2 +- 7 files changed, 22 insertions(+), 27 deletions(-) diff --git a/clientconn.go b/clientconn.go index c995a74b583..04566890451 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1275,7 +1275,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose) if err != nil { if logger.V(2) { - logger.Errorf("Creating new client transport to %q: %v", addr, err) + logger.Infof("Creating new client transport to %q: %v", addr, err) } // newTr is either nil, or closed. hcancel() diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 62268292bfd..81c3b32b9cf 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -758,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { } } if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { - return errors.New("finished processing active streams while in draining mode, no more streams to finish processing") + return errors.New("finished processing active streams while in draining mode") } return nil } @@ -793,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { if l.side == clientSide { l.draining = true if len(l.estdStreams) == 0 { - return errors.New("no active streams left to process when GOAWAY frame received, so can trigger closure") + return errors.New("received GOAWAY with no active streams") } } return nil diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index 79177d11fc0..bc835c528ed 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -142,10 +142,12 @@ type serverHandlerTransport struct { } func (ht *serverHandlerTransport) Close(err error) { - if logger.V(logLevel) { - logger.Infof("Closing serverHandlerTransport: %v", err) - } - ht.closeOnce.Do(ht.closeCloseChanOnce) + ht.closeOnce.Do(func() { + if logger.V(logLevel) { + logger.Infof("Closing serverHandlerTransport: %v", err) + } + ht.closeCloseChanOnce() + }) } func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index a2da49a77d7..04c45dc5890 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -245,7 +245,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts if connectCtx.Err() != nil { // connectCtx expired before exiting the function. Hard close the connection. if logger.V(logLevel) { - logger.Infof("transport: closing connection due to connect context expiring.") + logger.Infof("newClientTransport: closing connection due to connect context expiring.") } conn.Close() } @@ -448,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts go func() { t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) err := t.loopy.run() - if err != nil { - if logger.V(logLevel) { - logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) - } + if logger.V(logLevel) { + logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) } // Do not close the transport. Let reader goroutine handle it since // there might be data in the buffers. @@ -955,7 +953,7 @@ func (t *http2Client) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("Closing transport, will close the connection. Err: %v", err) + logger.Infof("Closing transport, will close the connection: %v", err) } // Call t.onClose ASAP to prevent the client from attempting to create new // streams. @@ -1016,7 +1014,7 @@ func (t *http2Client) GracefulClose() { active := len(t.activeStreams) t.mu.Unlock() if active == 0 { - t.Close(errNoStreamsDraining) + t.Close(connectionErrorf(true, nil, "no active streams left to process while draining")) return } t.controlBuf.put(&incomingGoAway{}) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 1a2acb9ce0b..0c52c21d7a8 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -332,10 +332,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, go func() { t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler - if err := t.loopy.run(); err != nil { - if logger.V(logLevel) { - logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) - } + err := t.loopy.run() + if logger.V(logLevel) { + logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err) } t.conn.Close() t.controlBuf.finish() @@ -345,7 +344,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, return t, nil } -// operateHeader takes action on the decoded headers. Returns an error if fatal +// operateHeaders takes action on the decoded headers. Returns an error if fatal // error encountered and transport needs to close, otherwise returns nil. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error { // Acquire max stream ID lock for entire duration @@ -368,7 +367,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( if streamID%2 != 1 || streamID <= t.maxStreamID { // illegal gRPC stream id. - return fmt.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) + return fmt.Errorf("received an illegal stream id: %v", streamID) } t.maxStreamID = streamID @@ -1196,7 +1195,7 @@ func (t *http2Server) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("Closing transport, will close the connection. Err: %v", err) + logger.Infof("Closing transport, will close the connection: %v", err) } t.state = closing streams := t.activeStreams @@ -1314,7 +1313,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { sid := t.maxStreamID retErr := g.closeConn if len(t.activeStreams) == 0 { - retErr = errors.New("second goaway written and no active streams left to process") + retErr = errors.New("second GOAWAY written and no active streams left to process") } t.mu.Unlock() t.maxStreamMu.Unlock() diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 3a34a3e9085..6cff20c8e02 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -761,10 +761,6 @@ func (e ConnectionError) Unwrap() error { var ( // ErrConnClosing indicates that the transport is closing. ErrConnClosing = connectionErrorf(true, nil, "transport is closing") - // errNoStreamsDraining indicates that this transport is finished processing - // while in draining mode due to no more active streams to process, thus - // making the transport and connection immediately ready to close. - errNoStreamsDraining = connectionErrorf(true, nil, "no active streams left to process while draining") // errStreamDrain indicates that the stream is rejected because the // connection is draining. This could be caused by goaway or balancer // removing the address. diff --git a/server.go b/server.go index fa8b90a9f4b..e9110869072 100644 --- a/server.go +++ b/server.go @@ -1046,7 +1046,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool { s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { - st.Close(errors.New("s.conns (representing active server transports) is nil")) + st.Close(errors.New("Server.addConn called when server has already been stopped")) return false } if s.drain { From dd3cceabd6ccbb39d2d0e7408a5ab95f563e030e Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 8 Dec 2022 18:32:50 -0500 Subject: [PATCH 7/8] Responded to Doug's comments --- internal/transport/controlbuf.go | 2 +- internal/transport/handler_server.go | 4 +--- internal/transport/http2_client.go | 6 +++--- server.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 81c3b32b9cf..9ce99033967 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -191,7 +191,7 @@ type goAway struct { code http2.ErrCode debugData []byte headsUp bool - closeConn error + closeConn error // if set, loopyWriter will exit, resulting in conn closure } func (*goAway) isTransportResponseFrame() bool { return false } diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index bc835c528ed..0c6ada99274 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -146,12 +146,10 @@ func (ht *serverHandlerTransport) Close(err error) { if logger.V(logLevel) { logger.Infof("Closing serverHandlerTransport: %v", err) } - ht.closeCloseChanOnce() + close(ht.closedCh) }) } -func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } - func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } // strAddr is a net.Addr backed by either a TCP "ip:port" string, or diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 04c45dc5890..6c0c5b241d1 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -242,10 +242,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts go func(conn net.Conn) { defer ctxMonitorDone.Fire() // Signal this goroutine has exited. <-newClientCtx.Done() // Block until connectCtx expires or the defer above executes. - if connectCtx.Err() != nil { + if err := connectCtx.Err(); err != nil { // connectCtx expired before exiting the function. Hard close the connection. if logger.V(logLevel) { - logger.Infof("newClientTransport: closing connection due to connect context expiring.") + logger.Infof("newClientTransport: aborting due to connectCtx: %v", err) } conn.Close() } @@ -1008,7 +1008,7 @@ func (t *http2Client) GracefulClose() { return } if logger.V(logLevel) { - logger.Infof("transport: GracefulClose called, transport switched to draining") + logger.Infof("transport: GracefulClose called") } t.state = draining active := len(t.activeStreams) diff --git a/server.go b/server.go index e9110869072..7456d6d32bc 100644 --- a/server.go +++ b/server.go @@ -1809,7 +1809,7 @@ func (s *Server) Stop() { } for _, cs := range conns { for st := range cs { - st.Close(errors.New("server Stop called")) + st.Close(errors.New("Server.Stop called")) } } if s.opts.numServerWorkers > 0 { From c2aa433dad386adaf50bd0ab94aaa66255637edd Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 8 Dec 2022 19:19:46 -0500 Subject: [PATCH 8/8] Doug's final comments --- internal/transport/controlbuf.go | 2 +- internal/transport/http2_client.go | 2 +- internal/transport/http2_server.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index 9ce99033967..aaa9c859a34 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -416,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) { select { case <-c.ch: case <-c.done: - return nil, errors.New("transport's context expired") + return nil, errors.New("transport closed by client") } } } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 6c0c5b241d1..3e582a2853c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -953,7 +953,7 @@ func (t *http2Client) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("Closing transport, will close the connection: %v", err) + logger.Infof("transport: closing: %v", err) } // Call t.onClose ASAP to prevent the client from attempting to create new // streams. diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 0c52c21d7a8..7f1b08f628e 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -367,7 +367,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( if streamID%2 != 1 || streamID <= t.maxStreamID { // illegal gRPC stream id. - return fmt.Errorf("received an illegal stream id: %v", streamID) + return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame) } t.maxStreamID = streamID @@ -1195,7 +1195,7 @@ func (t *http2Server) Close(err error) { return } if logger.V(logLevel) { - logger.Infof("Closing transport, will close the connection: %v", err) + logger.Infof("transport: closing: %v", err) } t.state = closing streams := t.activeStreams