From 4459a50a106c9acfc4b01e64a9693af3d5c16df2 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 11 Jul 2022 13:12:40 -0400 Subject: [PATCH 1/9] Fix race between activeStreams and bdp window size --- internal/transport/http2_client.go | 12 ++++----- test/end2end_test.go | 41 ++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index be371c6e0f7..03691ef1b20 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -685,6 +685,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, cleanup(err) return err } + s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.activeStreams[id] = s if channelz.IsOn() { atomic.AddInt64(&t.czData.streamsStarted, 1) @@ -718,7 +719,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, h.streamID = t.nextID t.nextID += 2 s.id = h.streamID - s.fc = &inFlow{limit: uint32(t.initialWindowSize)} if t.streamQuota > 0 && t.waitingStreams > 0 { select { case t.streamsQuotaAvailable <- struct{}{}: @@ -1003,13 +1003,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { // for the transport and the stream based on the current bdp // estimation. func (t *http2Client) updateFlowControl(n uint32) { - t.mu.Lock() - for _, s := range t.activeStreams { - s.fc.newLimit(n) - } - t.mu.Unlock() updateIWS := func(interface{}) bool { t.initialWindowSize = int32(n) + t.mu.Lock() + for _, s := range t.activeStreams { + s.fc.newLimit(n) + } + t.mu.Unlock() return true } t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) diff --git a/test/end2end_test.go b/test/end2end_test.go index da0acbf3d75..3a84ad70f46 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8041,3 +8041,44 @@ func (s) TestServerClosesConn(t *testing.T) { } t.Fatalf("timed out waiting for conns to be closed by server; still open: %v", atomic.LoadInt32(&wrapLis.connsOpen)) } + +// TestUnexpectedEOF tests a scenario where a client invokes two unary RPC +// calls. The first call receives a payload which exceeds max grpc receive +// message length, and the second gets a large response. This second RPC should +// not fail with unexpected.EOF. +func (s) TestUnexpectedEOF(t *testing.T) { + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Body: bytes.Repeat([]byte("a"), int(in.ResponseSize)), + }, + }, nil + }, + } + if err := ss.Start([]grpc.ServerOption{}); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < 10; i++ { + // exceeds grpc.DefaultMaxRecvMessageSize, this should error with + // RESOURCE_EXHAUSTED error. + _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}) + if err != nil { + if e, ok := status.FromError(err); ok { + if e.Code() != codes.ResourceExhausted { + t.Fatalf("unexpected err in UnaryCall: %v", err) + } + } + } + // Larger response that doesn't exceed DefaultMaxRecvMessageSize, this + // should work normally. + _, err = ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}) + if err != nil { + t.Fatalf("unexpected err in UnaryCall: %v", err) + } + } +} From f7515ef724ec442fc9942c3c85e3fa62a9163efc Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 11 Jul 2022 14:04:27 -0400 Subject: [PATCH 2/9] Fixed stream.fc being nil --- internal/transport/http2_client.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 03691ef1b20..8d3cf3664a7 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -685,8 +685,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, cleanup(err) return err } - s.fc = &inFlow{limit: uint32(t.initialWindowSize)} - t.activeStreams[id] = s if channelz.IsOn() { atomic.AddInt64(&t.czData.streamsStarted, 1) atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) @@ -719,6 +717,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, h.streamID = t.nextID t.nextID += 2 s.id = h.streamID + s.fc = &inFlow{limit: uint32(t.initialWindowSize)} + t.mu.Lock() + if t.activeStreams != nil { // Can be niled from Close() + t.activeStreams[s.id] = s + } + t.mu.Unlock() if t.streamQuota > 0 && t.waitingStreams > 0 { select { case t.streamsQuotaAvailable <- struct{}{}: @@ -744,10 +748,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, } for { success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { - if !checkForStreamQuota(it) { + if !checkForHeaderListSize(it) { return false } - if !checkForHeaderListSize(it) { + if !checkForStreamQuota(it) { return false } return true From ad7029098c5a1bd5bb8bdc6b3219c4523eeea7b9 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 11 Jul 2022 19:09:52 -0400 Subject: [PATCH 3/9] drop lock before put handleGoAway --- internal/transport/http2_client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 8d3cf3664a7..6fc8f9139cc 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1219,7 +1219,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { default: t.setGoAwayReason(f) close(t.goAway) + t.mu.Unlock() t.controlBuf.put(&incomingGoAway{}) + t.mu.Lock() // Notify the clientconn about the GOAWAY before we set the state to // draining, to allow the client to stop attempting to create streams // before disallowing new streams on this connection. From 2465c21ad4543675ef2f60e89f276401ac249e94 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 11 Jul 2022 19:15:37 -0400 Subject: [PATCH 4/9] Responded to Doug's comments --- internal/transport/http2_client.go | 8 +------- test/end2end_test.go | 9 +++++---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 6fc8f9139cc..9f8ca7d4a50 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -748,13 +748,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, } for { success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { - if !checkForHeaderListSize(it) { - return false - } - if !checkForStreamQuota(it) { - return false - } - return true + return checkForHeaderListSize(it) && checkForStreamQuota(it) }, hdr) if err != nil { // Connection closed. diff --git a/test/end2end_test.go b/test/end2end_test.go index 3a84ad70f46..e0d3c7b8ab6 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8068,10 +8068,11 @@ func (s) TestUnexpectedEOF(t *testing.T) { // RESOURCE_EXHAUSTED error. _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}) if err != nil { - if e, ok := status.FromError(err); ok { - if e.Code() != codes.ResourceExhausted { - t.Fatalf("unexpected err in UnaryCall: %v", err) - } + // This check doesn't fail on a non status error. However, the main + // this is testing for is an unexpected EOF with a status code + // INTERNAL so this is fine. + if code := status.Code(err); code != codes.ResourceExhausted { + t.Fatalf("unexpected err in UnaryCall: %v", err) } } // Larger response that doesn't exceed DefaultMaxRecvMessageSize, this From ff5bade26390f95186cc54d23182668bbdddaf2c Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 12 Jul 2022 10:33:36 -0400 Subject: [PATCH 5/9] Switched mutex unlock lock to defer --- internal/transport/http2_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 9f8ca7d4a50..ec7554ffda2 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1213,9 +1213,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { default: t.setGoAwayReason(f) close(t.goAway) - t.mu.Unlock() - t.controlBuf.put(&incomingGoAway{}) - t.mu.Lock() + defer t.controlBuf.put(&incomingGoAway{}) // Notify the clientconn about the GOAWAY before we set the state to // draining, to allow the client to stop attempting to create streams // before disallowing new streams on this connection. From 30cc71c15467979bab15cc5f065510dc8137f4f6 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 12 Jul 2022 16:12:47 -0400 Subject: [PATCH 6/9] QAZSE discussion changes --- internal/transport/http2_client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ec7554ffda2..a24afd58952 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -78,6 +78,7 @@ type http2Client struct { framer *framer // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. + // Do not access controlBuf with mu held. controlBuf *controlBuffer fc *trInFlow // The scheme used: https if TLS is on, http otherwise. @@ -109,6 +110,7 @@ type http2Client struct { waitingStreams uint32 nextID uint32 + // Do not access controlBuf with mu held. mu sync.Mutex // guard the following variables state transportState activeStreams map[uint32]*Stream @@ -719,8 +721,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, s.id = h.streamID s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.mu.Lock() - if t.activeStreams != nil { // Can be niled from Close() + if t.activeStreams != nil { // Can be niled from Close(). t.activeStreams[s.id] = s + } else { + return false // Don't create a stream if the transport is already closed. } t.mu.Unlock() if t.streamQuota > 0 && t.waitingStreams > 0 { From 06f7a92876fcf013075e4d4cd95004ce8913724b Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 12 Jul 2022 17:04:45 -0400 Subject: [PATCH 7/9] Responded to Easwar's and Doug's comments --- internal/transport/http2_client.go | 5 ++--- test/end2end_test.go | 10 ++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index a24afd58952..c4960ab41c0 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -721,11 +721,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, s.id = h.streamID s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.mu.Lock() - if t.activeStreams != nil { // Can be niled from Close(). - t.activeStreams[s.id] = s - } else { + if t.activeStreams == nil { // Can be niled from Close(). return false // Don't create a stream if the transport is already closed. } + t.activeStreams[s.id] = s t.mu.Unlock() if t.streamQuota > 0 && t.waitingStreams > 0 { select { diff --git a/test/end2end_test.go b/test/end2end_test.go index e0d3c7b8ab6..682c0c65d7c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8066,20 +8066,18 @@ func (s) TestUnexpectedEOF(t *testing.T) { for i := 0; i < 10; i++ { // exceeds grpc.DefaultMaxRecvMessageSize, this should error with // RESOURCE_EXHAUSTED error. - _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}) - if err != nil { + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}); err != nil { // This check doesn't fail on a non status error. However, the main // this is testing for is an unexpected EOF with a status code // INTERNAL so this is fine. if code := status.Code(err); code != codes.ResourceExhausted { - t.Fatalf("unexpected err in UnaryCall: %v", err) + t.Fatalf("UnaryCall RPC returned error: %v, want status code %v", err, codes.ResourceExhausted) } } // Larger response that doesn't exceed DefaultMaxRecvMessageSize, this // should work normally. - _, err = ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}) - if err != nil { - t.Fatalf("unexpected err in UnaryCall: %v", err) + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}); err != nil { + t.Fatalf("UnaryCall RPC failed: %v", err) } } } From 067003ee3c84ea4b98a04ef1a6cb0ebc2b5ddb69 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 12 Jul 2022 17:18:42 -0400 Subject: [PATCH 8/9] Unlocked mutex --- internal/transport/http2_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c4960ab41c0..c9873c07763 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -722,6 +722,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.mu.Lock() if t.activeStreams == nil { // Can be niled from Close(). + t.mu.Unlock() return false // Don't create a stream if the transport is already closed. } t.activeStreams[s.id] = s From 9af35755b1f13a4089f384377fbdb7c09837cb7e Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 12 Jul 2022 18:13:55 -0400 Subject: [PATCH 9/9] Responded to Doug's comments --- internal/transport/http2_client.go | 2 +- test/end2end_test.go | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c9873c07763..28c77af70ab 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1217,7 +1217,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { default: t.setGoAwayReason(f) close(t.goAway) - defer t.controlBuf.put(&incomingGoAway{}) + defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held. // Notify the clientconn about the GOAWAY before we set the state to // draining, to allow the client to stop attempting to create streams // before disallowing new streams on this connection. diff --git a/test/end2end_test.go b/test/end2end_test.go index 682c0c65d7c..c44925f96a6 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8066,13 +8066,9 @@ func (s) TestUnexpectedEOF(t *testing.T) { for i := 0; i < 10; i++ { // exceeds grpc.DefaultMaxRecvMessageSize, this should error with // RESOURCE_EXHAUSTED error. - if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}); err != nil { - // This check doesn't fail on a non status error. However, the main - // this is testing for is an unexpected EOF with a status code - // INTERNAL so this is fine. - if code := status.Code(err); code != codes.ResourceExhausted { - t.Fatalf("UnaryCall RPC returned error: %v, want status code %v", err, codes.ResourceExhausted) - } + _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}) + if code := status.Code(err); code != codes.ResourceExhausted { + t.Fatalf("UnaryCall RPC returned error: %v, want status code %v", err, codes.ResourceExhausted) } // Larger response that doesn't exceed DefaultMaxRecvMessageSize, this // should work normally.