From 6004b0d9d228f676bd7d23b69b1e989a340080c3 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 12 Sep 2022 22:47:44 -0400 Subject: [PATCH 1/4] Fixed deadlock in transport --- internal/transport/http2_client.go | 13 +++++++-- internal/transport/transport_test.go | 41 ++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 53643fa9747..7543357084d 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1232,16 +1232,23 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { if upperLimit == 0 { // This is the first GoAway Frame. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. } + + activeStreams := make(map[uint32]*Stream) for streamID, stream := range t.activeStreams { + activeStreams[streamID] = stream + } + + t.prevGoAwayID = id + t.mu.Unlock() + for streamID, stream := range activeStreams { if streamID > id && streamID <= upperLimit { // The stream was unprocessed by the server. atomic.StoreUint32(&stream.unprocessed, 1) t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) } } - t.prevGoAwayID = id - active := len(t.activeStreams) - t.mu.Unlock() + + active := len(activeStreams) if active == 0 { t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 760e1b64f35..f1618c0d5e1 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2501,3 +2501,44 @@ func (s) TestPeerSetInServerContext(t *testing.T) { } server.mu.Unlock() } + +// TestGoAwayCloseStreams tests the scenario where a client has many streams +// created, and the server sends a GOAWAY frame with a stream id less than some +// of them, while the client is still creating new streams. This should not +// induce a deadlock. +func (s) TestGoAwayCloseStreams(t *testing.T) { + server, ct, cancel := setUp(t, 0, math.MaxUint32, normal) + defer cancel() + defer server.stop() + defer ct.Close(fmt.Errorf("closed manually by test")) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < 5; i++ { + _, err := ct.NewStream(ctx, &CallHdr{}) + if err != nil { + t.Fatalf("error creating stream: %v", err) + } + } + + waitWhileTrue(t, func() (bool, error) { + server.mu.Lock() + defer server.mu.Unlock() + + if len(server.conns) == 0 { + return true, fmt.Errorf("timed-out while waiting for connection to be created on the server") + } + return false, nil + }) + + var st *http2Server + server.mu.Lock() + for k := range server.conns { + st = k.(*http2Server) + } + server.mu.Unlock() + + st.framer.fr.WriteGoAway(5, http2.ErrCodeNo, []byte{}) + for i := 0; i < 10; i++ { + ct.NewStream(ctx, &CallHdr{}) + } +} From 3e5eae7f42ed66ad83cce1a40e57979e20405322 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 15 Sep 2022 15:32:27 -0400 Subject: [PATCH 2/4] Switched transport level test to e2e test --- internal/transport/transport_test.go | 41 ---------- test/clienttester.go | 108 +++++++++++++++++++++++++++ test/end2end_test.go | 60 ++++++++++++++- 3 files changed, 167 insertions(+), 42 deletions(-) create mode 100644 test/clienttester.go diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index f1618c0d5e1..760e1b64f35 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2501,44 +2501,3 @@ func (s) TestPeerSetInServerContext(t *testing.T) { } server.mu.Unlock() } - -// TestGoAwayCloseStreams tests the scenario where a client has many streams -// created, and the server sends a GOAWAY frame with a stream id less than some -// of them, while the client is still creating new streams. This should not -// induce a deadlock. -func (s) TestGoAwayCloseStreams(t *testing.T) { - server, ct, cancel := setUp(t, 0, math.MaxUint32, normal) - defer cancel() - defer server.stop() - defer ct.Close(fmt.Errorf("closed manually by test")) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for i := 0; i < 5; i++ { - _, err := ct.NewStream(ctx, &CallHdr{}) - if err != nil { - t.Fatalf("error creating stream: %v", err) - } - } - - waitWhileTrue(t, func() (bool, error) { - server.mu.Lock() - defer server.mu.Unlock() - - if len(server.conns) == 0 { - return true, fmt.Errorf("timed-out while waiting for connection to be created on the server") - } - return false, nil - }) - - var st *http2Server - server.mu.Lock() - for k := range server.conns { - st = k.(*http2Server) - } - server.mu.Unlock() - - st.framer.fr.WriteGoAway(5, http2.ErrCodeNo, []byte{}) - for i := 0; i < 10; i++ { - ct.NewStream(ctx, &CallHdr{}) - } -} diff --git a/test/clienttester.go b/test/clienttester.go new file mode 100644 index 00000000000..7bad30de563 --- /dev/null +++ b/test/clienttester.go @@ -0,0 +1,108 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test + +import ( + "bytes" + "io" + "net" + "testing" + + "golang.org/x/net/http2" +) + +var ( + clientPreface = []byte(http2.ClientPreface) +) + +func newClientTester(t *testing.T, conn net.Conn) *clientTester { + ct := &clientTester{ + t: t, + conn: conn, + } + ct.fr = http2.NewFramer(conn, conn) + return ct +} + +type clientTester struct { + t *testing.T + conn net.Conn + fr *http2.Framer +} + +// greet() performs the necessary steps for http2 connection establishment on +// the server side. +func (ct *clientTester) greet() { + ct.wantClientPreface() + ct.wantSettingsFrame() + ct.writeSettingsFrame() + ct.writeSettingsAck() + + for { + f, err := ct.fr.ReadFrame() + if err != nil { + ct.t.Errorf("error reading frame from client side: %v", err) + } + switch f := f.(type) { + case *http2.SettingsFrame: + if f.IsAck() { // HTTP/2 handshake completed. + return + } + default: + ct.t.Errorf("during greet, unexpected frame type %T", f) + } + } +} + +func (ct *clientTester) wantClientPreface() { + preface := make([]byte, len(clientPreface)) + if _, err := io.ReadFull(ct.conn, preface); err != nil { + ct.t.Errorf("Error at server-side while reading preface from client. Err: %v", err) + } + if !bytes.Equal(preface, clientPreface) { + ct.t.Errorf("received bogus greeting from client %q", preface) + } +} + +func (ct *clientTester) wantSettingsFrame() { + frame, err := ct.fr.ReadFrame() + if err != nil { + ct.t.Errorf("error reading initial settings frame from client: %v", err) + } + _, ok := frame.(*http2.SettingsFrame) + if !ok { + ct.t.Errorf("initial frame sent from client is not a settings frame, type %T", frame) + } +} + +func (ct *clientTester) writeSettingsFrame() { + if err := ct.fr.WriteSettings(); err != nil { + ct.t.Fatalf("Error writing initial SETTINGS frame from client to server: %v", err) + } +} + +func (ct *clientTester) writeSettingsAck() { + if err := ct.fr.WriteSettingsAck(); err != nil { + ct.t.Fatalf("Error writing ACK of client's SETTINGS: %v", err) + } +} + +func (ct *clientTester) writeGoAway(maxStreamID uint32, code http2.ErrCode, debugData []byte) { + if err := ct.fr.WriteGoAway(maxStreamID, code, debugData); err != nil { + ct.t.Fatalf("Error writing GOAWAY: %v", err) + } +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 8a4f1151567..929b340435d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -7407,7 +7407,6 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) { return } writer.Flush() // necessary since client is expecting preface before declaring connection fully setup. - var sid uint32 // Loop until conn is closed and framer returns io.EOF for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) { @@ -8130,3 +8129,62 @@ func (s) TestRecvWhileReturningStatus(t *testing.T) { } } } + +// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server +// sends a goaway with a stream id that is smaller than some created streams on +// the client, while the client is simultaneously creating new streams. This +// should not induce a deadlock. +func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + + ctCh := testutils.NewChannel() + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("error in lis.Accept(): %v", err) + } + ct := newClientTester(t, conn) + ct.greet() + ctCh.Send(ct) + }() + + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("error dialing: %v", err) + } + defer cc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + val, err := ctCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout waiting for client transport (should be given after http2 creation)") + } + ct, ok := val.(*clientTester) + if !ok { + t.Fatalf("value received not a clientTester") + } + + tc := testpb.NewTestServiceClient(cc) + someStreamsCreated := grpcsync.NewEvent() + goAwayWritten := grpcsync.NewEvent() + go func() { + for i := 0; i < 20; i++ { + if i == 10 { + <-goAwayWritten.Done() + } + tc.FullDuplexCall(ctx) + if i == 4 { + someStreamsCreated.Fire() + } + } + }() + + <-someStreamsCreated.Done() + ct.writeGoAway(1, http2.ErrCodeNo, []byte{}) + goAwayWritten.Fire() +} From 09bdda852f9498f4e5fa792670f2e3736c2764f6 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 19 Sep 2022 17:28:08 -0400 Subject: [PATCH 3/4] Responded to Doug's comment --- internal/transport/http2_client.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 7543357084d..19d4633b80f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1233,24 +1233,26 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. } - activeStreams := make(map[uint32]*Stream) - for streamID, stream := range t.activeStreams { - activeStreams[streamID] = stream + t.prevGoAwayID = id + if len(t.activeStreams) == 0 { + t.mu.Unlock() + t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) + return } - t.prevGoAwayID = id - t.mu.Unlock() - for streamID, stream := range activeStreams { + streamsToClose := make([]*Stream, 0) + for streamID, stream := range t.activeStreams { if streamID > id && streamID <= upperLimit { // The stream was unprocessed by the server. - atomic.StoreUint32(&stream.unprocessed, 1) - t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) + if streamID > id && streamID <= upperLimit { + atomic.StoreUint32(&stream.unprocessed, 1) + streamsToClose = append(streamsToClose, stream) + } } } - - active := len(activeStreams) - if active == 0 { - t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) + t.mu.Unlock() + for _, stream := range streamsToClose { + t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) } } From f9a0d75791c73043d8dcd8fb506c5dbc8677e95d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 21 Sep 2022 13:37:46 -0400 Subject: [PATCH 4/4] Responded to Doug's comments --- internal/transport/http2_client.go | 2 ++ test/clienttester.go | 1 + test/end2end_test.go | 6 +----- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 19d4633b80f..5c2f35b24e7 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1251,6 +1251,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { } } t.mu.Unlock() + // Called outside t.mu because closeStream can take controlBuf's mu, which + // could induce deadlock and is not allowed. for _, stream := range streamsToClose { t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) } diff --git a/test/clienttester.go b/test/clienttester.go index 7bad30de563..7e223091164 100644 --- a/test/clienttester.go +++ b/test/clienttester.go @@ -35,6 +35,7 @@ func newClientTester(t *testing.T, conn net.Conn) *clientTester { conn: conn, } ct.fr = http2.NewFramer(conn, conn) + ct.greet() return ct } diff --git a/test/end2end_test.go b/test/end2end_test.go index 929b340435d..725bcdb641e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -8147,7 +8147,6 @@ func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) { t.Errorf("error in lis.Accept(): %v", err) } ct := newClientTester(t, conn) - ct.greet() ctCh.Send(ct) }() @@ -8164,10 +8163,7 @@ func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) { if err != nil { t.Fatalf("timeout waiting for client transport (should be given after http2 creation)") } - ct, ok := val.(*clientTester) - if !ok { - t.Fatalf("value received not a clientTester") - } + ct := val.(*clientTester) tc := testpb.NewTestServiceClient(cc) someStreamsCreated := grpcsync.NewEvent()