Skip to content

Commit

Permalink
Fix race between activeStreams and bdp window size
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jul 11, 2022
1 parent 2c0949c commit 4459a50
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
12 changes: 6 additions & 6 deletions internal/transport/http2_client.go
Expand Up @@ -685,6 +685,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
cleanup(err)
return err
}
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
Expand Down Expand Up @@ -718,7 +719,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
h.streamID = t.nextID
t.nextID += 2
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
Expand Down Expand Up @@ -1003,13 +1003,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
updateIWS := func(interface{}) bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.mu.Unlock()
return true
}
t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
Expand Down
41 changes: 41 additions & 0 deletions test/end2end_test.go
Expand Up @@ -8041,3 +8041,44 @@ func (s) TestServerClosesConn(t *testing.T) {
}
t.Fatalf("timed out waiting for conns to be closed by server; still open: %v", atomic.LoadInt32(&wrapLis.connsOpen))
}

// TestUnexpectedEOF tests a scenario where a client invokes two unary RPC
// calls. The first call receives a payload which exceeds max grpc receive
// message length, and the second gets a large response. This second RPC should
// not fail with unexpected.EOF.
func (s) TestUnexpectedEOF(t *testing.T) {
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Body: bytes.Repeat([]byte("a"), int(in.ResponseSize)),
},
}, nil
},
}
if err := ss.Start([]grpc.ServerOption{}); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 10; i++ {
// exceeds grpc.DefaultMaxRecvMessageSize, this should error with
// RESOURCE_EXHAUSTED error.
_, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304})
if err != nil {
if e, ok := status.FromError(err); ok {
if e.Code() != codes.ResourceExhausted {
t.Fatalf("unexpected err in UnaryCall: %v", err)
}
}
}
// Larger response that doesn't exceed DefaultMaxRecvMessageSize, this
// should work normally.
_, err = ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075})
if err != nil {
t.Fatalf("unexpected err in UnaryCall: %v", err)
}
}
}

0 comments on commit 4459a50

Please sign in to comment.