Skip to content

Commit

Permalink
test: add option to make httpServer wait for END_STREAM; fix RetrySta…
Browse files Browse the repository at this point in the history
…ts race (#4811)
  • Loading branch information
dfawley committed Sep 24, 2021
1 parent 6ff68b4 commit 11437f6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 17 deletions.
2 changes: 1 addition & 1 deletion internal/transport/http2_client.go
Expand Up @@ -1073,7 +1073,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/http2_server.go
Expand Up @@ -734,7 +734,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
if f.StreamEnded() {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
Expand Down
28 changes: 24 additions & 4 deletions test/end2end_test.go
Expand Up @@ -7352,8 +7352,11 @@ type httpServerResponse struct {
}

type httpServer struct {
refuseStream func(uint32) bool
responses []httpServerResponse
// If waitForEndStream is set, wait for the client to send a frame with end
// stream in it before sending a response/refused stream.
waitForEndStream bool
refuseStream func(uint32) bool
responses []httpServerResponse
}

func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error {
Expand Down Expand Up @@ -7416,8 +7419,25 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
}
return
}
if hframe, ok := frame.(*http2.HeadersFrame); ok {
sid = hframe.Header().StreamID
sid = 0
switch fr := frame.(type) {
case *http2.HeadersFrame:
// Respond after this if we are not waiting for an end
// stream or if this frame ends it.
if !s.waitForEndStream || fr.StreamEnded() {
sid = fr.Header().StreamID
}

case *http2.DataFrame:
// Respond after this if we were waiting for an end stream
// and this frame ends it. (If we were not waiting for an
// end stream, this stream was already responded to when
// the headers were received.)
if s.waitForEndStream && fr.StreamEnded() {
sid = fr.Header().StreamID
}
}
if sid != 0 {
if s.refuseStream == nil || !s.refuseStream(sid) {
break
}
Expand Down
15 changes: 4 additions & 11 deletions test/retry_test.go
Expand Up @@ -517,6 +517,7 @@ func (s) TestRetryStats(t *testing.T) {
}
defer lis.Close()
server := &httpServer{
waitForEndStream: true,
responses: []httpServerResponse{{
trailers: [][]string{{
":status", "200",
Expand Down Expand Up @@ -588,13 +589,6 @@ func (s) TestRetryStats(t *testing.T) {
&stats.End{},
}

// There is a race between noticing the RST_STREAM during the first RPC
// attempt and writing the payload. If we detect that the client did not
// send the OutPayload, we remove it from want.
if _, ok := handler.s[2].(*stats.End); ok {
want = append(want[:2], want[3:]...)
}

toString := func(ss []stats.RPCStats) (ret []string) {
for _, s := range ss {
ret = append(ret, fmt.Sprintf("%T - %v", s, s))
Expand All @@ -612,8 +606,7 @@ func (s) TestRetryStats(t *testing.T) {
// There is a race between receiving the payload (triggered by the
// application / gRPC library) and receiving the trailer (triggered at the
// transport layer). Adjust the received stats accordingly if necessary.
// Note: we measure from the end of the RPCStats due to the race above.
tIdx, pIdx := len(handler.s)-3, len(handler.s)-2
const tIdx, pIdx = 13, 14
_, okT := handler.s[tIdx].(*stats.InTrailer)
_, okP := handler.s[pIdx].(*stats.InPayload)
if okT && okP {
Expand Down Expand Up @@ -654,8 +647,8 @@ func (s) TestRetryStats(t *testing.T) {
}

// Validate timings between last Begin and preceding End.
end := handler.s[len(handler.s)-8].(*stats.End)
begin := handler.s[len(handler.s)-7].(*stats.Begin)
end := handler.s[8].(*stats.End)
begin := handler.s[9].(*stats.Begin)
diff := begin.BeginTime.Sub(end.EndTime)
if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
Expand Down

0 comments on commit 11437f6

Please sign in to comment.