Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logs for reasons causing connection and transport close #5840

Merged
merged 8 commits into from Dec 9, 2022
3 changes: 3 additions & 0 deletions clientconn.go
Expand Up @@ -1274,6 +1274,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
if err != nil {
if logger.V(2) {
logger.Infof("Creating new client transport to %q: %v", addr, err)
}
// newTr is either nil, or closed.
hcancel()
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
Expand Down
28 changes: 7 additions & 21 deletions internal/transport/controlbuf.go
Expand Up @@ -191,7 +191,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
}

func (*goAway) isTransportResponseFrame() bool { return false }
Expand Down Expand Up @@ -416,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
return nil, ErrConnClosing
return nil, errors.New("transport's context expired")
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -527,18 +527,6 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
defer func() {
if err == ErrConnClosing {
// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", err)
}
err = nil
}
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
Expand Down Expand Up @@ -582,7 +570,6 @@ func (l *loopyWriter) run() (err error) {
}
l.framer.writer.Flush()
break hasdata

}
}
}
Expand Down Expand Up @@ -670,11 +657,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
if err == errStreamDrain { // errStreamDrain need not close transport
return nil
}
// Other errors(errStreamDrain) need not close transport.
return nil
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
Expand Down Expand Up @@ -772,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("finished processing active streams while in draining mode")
}
return nil
}
Expand Down Expand Up @@ -807,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
return ErrConnClosing
return errors.New("received GOAWAY with no active streams")
}
}
return nil
Expand Down
15 changes: 9 additions & 6 deletions internal/transport/handler_server.go
Expand Up @@ -141,12 +141,15 @@ type serverHandlerTransport struct {
stats []stats.Handler
}

func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
if logger.V(logLevel) {
logger.Infof("Closing serverHandlerTransport: %v", err)
}
close(ht.closedCh)
})
}

func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }

func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }

// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
Expand Down Expand Up @@ -236,7 +239,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
}
}
ht.Close()
ht.Close(errors.New("finished writing status"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now spamming our logs, is it actually useful?

return err
}

Expand Down Expand Up @@ -346,7 +349,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
case <-ht.req.Context().Done():
}
cancel()
ht.Close()
ht.Close(errors.New("request is done processing"))
}()

req := ht.req
Expand Down
19 changes: 13 additions & 6 deletions internal/transport/http2_client.go
Expand Up @@ -242,8 +242,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if connectCtx.Err() != nil {
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
}
conn.Close()
}
}(conn)
Expand Down Expand Up @@ -445,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
Expand Down Expand Up @@ -951,6 +952,9 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("Closing transport, will close the connection: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
Expand Down Expand Up @@ -1003,11 +1007,14 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(ErrConnClosing)
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down
69 changes: 31 additions & 38 deletions internal/transport/http2_server.go
Expand Up @@ -21,6 +21,7 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -293,7 +294,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

defer func() {
if err != nil {
t.Close()
t.Close(err)
}
}()

Expand Down Expand Up @@ -331,10 +332,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
Expand All @@ -344,8 +344,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
return t, nil
}

// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
Expand All @@ -361,15 +362,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeFrameSize,
onWrite: func() {},
})
return false
return nil
}

if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
return fmt.Errorf("received an illegal stream id: %v", streamID)
}
t.maxStreamID = streamID

Expand Down Expand Up @@ -453,7 +451,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
return false
return nil
}

if !isGRPC || headerError {
Expand All @@ -463,7 +461,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
return false
return nil
}

// "If :authority is missing, Host must be renamed to :authority." - A41
Expand Down Expand Up @@ -503,7 +501,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
return nil
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
Expand All @@ -514,7 +512,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
onWrite: func() {},
})
s.cancel()
return false
return nil
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
Expand All @@ -530,7 +528,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
rst: !frame.StreamEnded(),
})
s.cancel()
return false
return nil
}
if t.inTapHandle != nil {
var err error
Expand All @@ -550,7 +548,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
status: stat,
rst: !frame.StreamEnded(),
})
return false
return nil
}
}
t.activeStreams[streamID] = s
Expand Down Expand Up @@ -597,7 +595,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
wq: s.wq,
})
handle(s)
return false
return nil
}

// HandleStreams receives incoming streams using the given handler. This is
Expand Down Expand Up @@ -630,19 +628,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
t.Close(err)
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
t.Close(err)
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
t.Close(err)
break
}
case *http2.DataFrame:
Expand Down Expand Up @@ -886,10 +881,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
}
}

Expand Down Expand Up @@ -1169,10 +1161,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
return
}
if !outstandingPing {
Expand All @@ -1199,12 +1188,15 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() {
func (t *http2Server) Close(err error) {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
}
if logger.V(logLevel) {
logger.Infof("Closing transport, will close the connection: %v", err)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
Expand Down Expand Up @@ -1319,19 +1311,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Stop accepting more streams now.
t.state = draining
sid := t.maxStreamID
retErr := g.closeConn
if len(t.activeStreams) == 0 {
g.closeConn = true
retErr = errors.New("second GOAWAY written and no active streams left to process")
}
t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
if g.closeConn {
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, fmt.Errorf("transport: Connection closing")
return false, retErr
}
return true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Expand Up @@ -701,7 +701,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close()
Close(err error)

// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
Expand Down