Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: unblock read throttling when controlbuf exits #4447

Merged
merged 2 commits into from May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/transport/controlbuf.go
Expand Up @@ -406,7 +406,6 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
select {
case <-c.ch:
case <-c.done:
c.finish()
return nil, ErrConnClosing
}
}
Expand All @@ -431,6 +430,14 @@ func (c *controlBuffer) finish() {
hdr.onOrphaned(ErrConnClosing)
}
}
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add this?

This won't race with any other goroutine, because the caller (the loopyWriter() goroutine) is the only goroutine that ever sets the channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Races are actually avoided by taking the lock and setting c.err, and get, which sets the channel, checking the error. This is because finish may also be called by t.Close while get is in flight.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, t.Close() also calls finish()? Why would it need to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to now. I can remove it, but I don't think it simplifies things, really.

Actually, I think we could change things a little bit to improve them slightly.

  1. Call finish from a defer in run.
  2. Change trfChan to a chan struct{} instead of atomic.Value and access it with atomic.___Pointer
  3. finish could close trfChan directly without even the atomic load, since we know finish is only called by run.

Copy link
Contributor

@menghanl menghanl May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the "race" I'm talking about here is not read/write race.
It's that if finish() unblocks this channel, but another goroutine sets it to block again, before the reader() has a chance to unblock, leading to another deadlock.
(and it's also handled by the error check)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no read/write race due to the use of the atomic. I think we were talking about the same thing. And if finish is called by Close, it can happen while get is in flight, which, if get didn't check c.err, could encounter this race. Anyway, let me know what you want me to do here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in this PR is OK.

We should cleanup the transport code later (when? 🤷‍♂️).

  • the channel (controlbuf) is closed by the reader (loopyWriter()) instead of the writer (reader()) (hahahahaha)
    • which is, counterintuitive
  • the multiple caller of t.Close() is probably part of the cause that RPCs tend to fail with transport is closing
    • if the transport is only closed by the reader(), and the other places just tells reader() to close, it might make things cleaner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
close(ch)
}
c.trfChan.Store((chan struct{})(nil))
c.mu.Unlock()
}

Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_client.go
Expand Up @@ -402,6 +402,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
return t, nil
Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_server.go
Expand Up @@ -295,6 +295,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
go t.keepalive()
Expand Down
72 changes: 72 additions & 0 deletions test/end2end_test.go
Expand Up @@ -71,6 +71,7 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/test/bufconn"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata"
)
Expand Down Expand Up @@ -7524,3 +7525,74 @@ func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
}
wg.Wait()
}

func (s) TestClientSettingsFloodCloseConn(t *testing.T) {
// Tests that the server properly closes its transport if the client floods
// settings frames and then closes the connection.

// Minimize buffer sizes to stimulate failure condition more quickly.
s := grpc.NewServer(grpc.WriteBufferSize(20))
l := bufconn.Listen(20)
go s.Serve(l)

// Dial our server and handshake.
conn, err := l.Dial()
if err != nil {
t.Fatalf("Error dialing bufconn: %v", err)
}

n, err := conn.Write([]byte(http2.ClientPreface))
if err != nil || n != len(http2.ClientPreface) {
t.Fatalf("Error writing client preface: %v, %v", n, err)
}

fr := http2.NewFramer(conn, conn)
f, err := fr.ReadFrame()
if err != nil {
t.Fatalf("Error reading initial settings frame: %v", err)
}
if _, ok := f.(*http2.SettingsFrame); ok {
if err := fr.WriteSettingsAck(); err != nil {
t.Fatalf("Error writing settings ack: %v", err)
}
} else {
t.Fatalf("Error reading initial settings frame: type=%T", f)
}

// Confirm settings can be written, and that an ack is read.
if err = fr.WriteSettings(); err != nil {
t.Fatalf("Error writing settings frame: %v", err)
}
if f, err = fr.ReadFrame(); err != nil {
t.Fatalf("Error reading frame: %v", err)
}
if sf, ok := f.(*http2.SettingsFrame); !ok || !sf.IsAck() {
t.Fatalf("Unexpected frame: %v", f)
}

// Flood settings frames until a timeout occurs, indiciating the server has
// stopped reading from the connection, then close the conn.
for {
conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond))
if err := fr.WriteSettings(); err != nil {
if to, ok := err.(interface{ Timeout() bool }); !ok || !to.Timeout() {
t.Fatalf("Received unexpected write error: %v", err)
}
break
}
}
conn.Close()

// If the server does not handle this situation correctly, it will never
// close the transport. This is because its loopyWriter.run() will have
// exited, and thus not handle the goAway the draining process initiates.
// Also, we would see a goroutine leak in this case, as the reader would be
// blocked on the controlBuf's throttle() method indefinitely.

timer := time.AfterFunc(5*time.Second, func() {
t.Errorf("Timeout waiting for GracefulStop to return")
s.Stop()
})
s.GracefulStop()
timer.Stop()
}