Skip to content

Commit

Permalink
transport: unblock read throttling when controlbuf exits
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 18, 2021
1 parent 23a83dd commit 70d7ce2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
9 changes: 8 additions & 1 deletion internal/transport/controlbuf.go
Expand Up @@ -406,7 +406,6 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
}
}
Expand All @@ -431,6 +430,14 @@ func (c *controlBuffer) finish() {
hdr.onOrphaned(ErrConnClosing)
}
}
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
ch, _ := c.trfChan.Load().(*chan struct{})
if ch != nil {
close(*ch)
}
c.trfChan.Store((*chan struct{})(nil))
c.mu.Unlock()
}

Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_client.go
Expand Up @@ -402,6 +402,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
return t, nil
Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_server.go
Expand Up @@ -295,6 +295,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
Expand Down
72 changes: 72 additions & 0 deletions test/end2end_test.go
Expand Up @@ -71,6 +71,7 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/test/bufconn"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata"
)
Expand Down Expand Up @@ -7524,3 +7525,74 @@ func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
}
wg.Wait()
}

func (s) TestClientSettingsFloodCloseConn(t *testing.T) {
// Tests that the server properly closes its transport if the client floods
// settings frames and then closes the connection.

// Minimize buffer sizes to stimulate failure condition more quickly.
s := grpc.NewServer(grpc.WriteBufferSize(20))
l := bufconn.Listen(20)
go s.Serve(l)

// Dial our server and handshake.
conn, err := l.Dial()
if err != nil {
t.Fatalf("Error dialing bufconn: %v", err)
}

n, err := conn.Write([]byte(http2.ClientPreface))
if err != nil || n != len(http2.ClientPreface) {
t.Fatalf("Error writing client preface: %v, %v", n, err)
}

fr := http2.NewFramer(conn, conn)
f, err := fr.ReadFrame()
if err != nil {
t.Fatalf("Error reading initial settings frame: %v", err)
}
if _, ok := f.(*http2.SettingsFrame); ok {
if err := fr.WriteSettingsAck(); err != nil {
t.Fatalf("Error writing settings ack: %v", err)
}
} else {
t.Fatalf("Error reading initial settings frame: type=%T", f)
}

// Confirm settings can be written, and that an ack is read.
if err = fr.WriteSettings(); err != nil {
t.Fatalf("Error writing settings frame: %v", err)
}
if f, err = fr.ReadFrame(); err != nil {
t.Fatalf("Error reading frame: %v", err)
}
if sf, ok := f.(*http2.SettingsFrame); !ok || !sf.IsAck() {
t.Fatalf("Unexpected frame: %v", f)
}

// Flood settings frames until a timeout occurs, indiciating the server has
// stopped reading from the connection, then close the conn.
for {
conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond))
if err := fr.WriteSettings(); err != nil {
if to, ok := err.(interface{ Timeout() bool }); !ok || !to.Timeout() {
t.Fatalf("Received unexpected write error: %v", err)
}
break
}
}
conn.Close()

// If the server does not handle this situation correctly, it will never
// close the transport. This is because its loopyWriter.run() will have
// exited, and thus not handle the goAway the draining process initiates.
// Also, we would see a goroutine leak in this case, as the reader would be
// blocked on the controlBuf's throttle() method indefinitely.

timer := time.AfterFunc(5*time.Second, func() {
t.Errorf("Timeout waiting for GracefulStop to return")
s.Stop()
})
s.GracefulStop()
timer.Stop()
}

0 comments on commit 70d7ce2

Please sign in to comment.