Skip to content

Commit

Permalink
Added logs for reasons causing connection and transport close (#5840)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Dec 9, 2022
1 parent aba03e1 commit a9709c3
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 79 deletions.
3 changes: 3 additions & 0 deletions clientconn.go
Expand Up @@ -1274,6 +1274,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
if err != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", addr, err)
}
// newTr is either nil, or closed.
hcancel()
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
Expand Down
28 changes: 7 additions & 21 deletions internal/transport/controlbuf.go
Expand Up @@ -191,7 +191,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
}

func (*goAway) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -416,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
return nil, ErrConnClosing
return nil, errors.New("transport closed by client")
}
}
}
Expand Down Expand Up @@ -527,18 +527,6 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
defer func() {
if err == ErrConnClosing {
// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", err)
}
err = nil
}
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
Expand Down Expand Up @@ -582,7 +570,6 @@ func (l *loopyWriter) run() (err error) {
}
l.framer.writer.Flush()
break hasdata

}
}
}
Expand Down Expand Up @@ -670,11 +657,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
if err == errStreamDrain { // errStreamDrain need not close transport
return nil
}
// Other errors(errStreamDrain) need not close transport.
return nil
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
Expand Down Expand Up @@ -772,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
Expand Down Expand Up @@ -807,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("received GOAWAY with no active streams")
}
}
return nil
Expand Down
15 changes: 9 additions & 6 deletions internal/transport/handler_server.go
Expand Up @@ -141,12 +141,15 @@ type serverHandlerTransport struct {
stats []stats.Handler
}

func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
close(ht.closedCh)
})
}

func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }

func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }

// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
Expand Down Expand Up @@ -236,7 +239,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
}
}
ht.Close()
ht.Close(errors.New("finished writing status"))
return err
}

Expand Down Expand Up @@ -346,7 +349,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
case <-ht.req.Context().Done():
}
cancel()
ht.Close()
ht.Close(errors.New("request is done processing"))
}()

req := ht.req
Expand Down
19 changes: 13 additions & 6 deletions internal/transport/http2_client.go
Expand Up @@ -242,8 +242,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if connectCtx.Err() != nil {
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
}
conn.Close()
}
}(conn)
Expand Down Expand Up @@ -445,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
Expand Down Expand Up @@ -951,6 +952,9 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
Expand Down Expand Up @@ -1003,11 +1007,14 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(ErrConnClosing)
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down
69 changes: 31 additions & 38 deletions internal/transport/http2_server.go
Expand Up @@ -21,6 +21,7 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -293,7 +294,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

defer func() {
if err != nil {
t.Close()
t.Close(err)
}
}()

Expand Down Expand Up @@ -331,10 +332,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
Expand All @@ -344,8 +344,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
return t, nil
}

// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
Expand All @@ -361,15 +362,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
return nil
}

if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
}
t.maxStreamID = streamID

Expand Down Expand Up @@ -453,7 +451,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
return false
return nil
}

if !isGRPC || headerError {
Expand All @@ -463,7 +461,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return false
return nil
}

// "If :authority is missing, Host must be renamed to :authority." - A41
Expand Down Expand Up @@ -503,7 +501,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
return nil
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
Expand All @@ -514,7 +512,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
onWrite: func() {},
})
s.cancel()
return false
return nil
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
Expand All @@ -530,7 +528,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rst: !frame.StreamEnded(),
})
s.cancel()
return false
return nil
}
if t.inTapHandle != nil {
var err error
Expand All @@ -550,7 +548,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: stat,
rst: !frame.StreamEnded(),
})
return false
return nil
}
}
t.activeStreams[streamID] = s
Expand Down Expand Up @@ -597,7 +595,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
wq: s.wq,
})
handle(s)
return false
return nil
}

// HandleStreams receives incoming streams using the given handler. This is
Expand Down Expand Up @@ -630,19 +628,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
t.Close(err)
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
t.Close(err)
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
t.Close(err)
break
}
case *http2.DataFrame:
Expand Down Expand Up @@ -886,10 +881,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}

Expand Down Expand Up @@ -1169,10 +1161,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
return
}
if !outstandingPing {
Expand All @@ -1199,12 +1188,15 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() {
func (t *http2Server) Close(err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: closing: %v", err)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
Expand Down Expand Up @@ -1319,19 +1311,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
retErr := g.closeConn
if len(t.activeStreams) == 0 {
g.closeConn = true
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
if g.closeConn {
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, fmt.Errorf("transport: Connection closing")
return false, retErr
}
return true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Expand Up @@ -701,7 +701,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close()
Close(err error)

// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
Expand Down

0 comments on commit a9709c3

Please sign in to comment.