From 4459a50a106c9acfc4b01e64a9693af3d5c16df2 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 11 Jul 2022 13:12:40 -0400 Subject: [PATCH] Fix race between activeStreams and bdp window size --- internal/transport/http2_client.go | 12 ++++----- test/end2end_test.go | 41 ++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index be371c6e0f7..03691ef1b20 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -685,6 +685,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, cleanup(err) return err } + s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.activeStreams[id] = s if channelz.IsOn() { atomic.AddInt64(&t.czData.streamsStarted, 1) @@ -718,7 +719,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, h.streamID = t.nextID t.nextID += 2 s.id = h.streamID - s.fc = &inFlow{limit: uint32(t.initialWindowSize)} if t.streamQuota > 0 && t.waitingStreams > 0 { select { case t.streamsQuotaAvailable <- struct{}{}: @@ -1003,13 +1003,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { // for the transport and the stream based on the current bdp // estimation. func (t *http2Client) updateFlowControl(n uint32) { - t.mu.Lock() - for _, s := range t.activeStreams { - s.fc.newLimit(n) - } - t.mu.Unlock() updateIWS := func(interface{}) bool { t.initialWindowSize = int32(n) + t.mu.Lock() + for _, s := range t.activeStreams { + s.fc.newLimit(n) + } + t.mu.Unlock() return true } t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) diff --git a/test/end2end_test.go b/test/end2end_test.go index da0acbf3d75..3a84ad70f46 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8041,3 +8041,44 @@ func (s) TestServerClosesConn(t *testing.T) { } t.Fatalf("timed out waiting for conns to be closed by server; still open: %v", atomic.LoadInt32(&wrapLis.connsOpen)) } + +// TestUnexpectedEOF tests a scenario where a client invokes two unary RPC +// calls. The first call receives a payload which exceeds max grpc receive +// message length, and the second gets a large response. This second RPC should +// not fail with unexpected.EOF. +func (s) TestUnexpectedEOF(t *testing.T) { + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Body: bytes.Repeat([]byte("a"), int(in.ResponseSize)), + }, + }, nil + }, + } + if err := ss.Start([]grpc.ServerOption{}); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < 10; i++ { + // exceeds grpc.DefaultMaxRecvMessageSize, this should error with + // RESOURCE_EXHAUSTED error. + _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304}) + if err != nil { + if e, ok := status.FromError(err); ok { + if e.Code() != codes.ResourceExhausted { + t.Fatalf("unexpected err in UnaryCall: %v", err) + } + } + } + // Larger response that doesn't exceed DefaultMaxRecvMessageSize, this + // should work normally. + _, err = ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}) + if err != nil { + t.Fatalf("unexpected err in UnaryCall: %v", err) + } + } +}