From 5bffad283d3016628c6acb533a5dca8cbb4dcc2b Mon Sep 17 00:00:00 2001 From: Ehsan Afzali Date: Fri, 27 Mar 2020 13:25:18 +0200 Subject: [PATCH 1/3] add rpcInfo to context in serverStream --- server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server.go b/server.go index 0a151dee4fc..2d8e005cd6f 100644 --- a/server.go +++ b/server.go @@ -1521,6 +1521,8 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } } + ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp) + if trInfo != nil { trInfo.tr.LazyLog(&trInfo.firstLine, false) } From 52c21188d8880c5e947027efb682e2d1b63848ad Mon Sep 17 00:00:00 2001 From: Ehsan Afzali Date: Tue, 31 Mar 2020 20:24:32 +0300 Subject: [PATCH 2/3] add e2e test for preloader in server send stream --- test/end2end_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index eb91d09afdf..aba13a54ae5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2248,6 +2248,74 @@ func testPreloaderClientSend(t *testing.T, e env) { } } +// preparedMsgSendServer is a TestServiceServer whose +// StreamingOutputCall makes a SendMsg calls using PreparedMsg, +// sending prepared messaged with payload "0". +// TestPreloaderSenderSend verifies it is being sent correctly. +// +// All other TestServiceServer methods crash if called. +type preparedMsgSendServer struct { + testpb.TestServiceServer +} + +func (s preparedMsgSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { + preparedMsg := &grpc.PreparedMsg{} + err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{ + Payload: &testpb.Payload{ + Body: []byte{'0'}, + }, + }) + if err != nil { + return err + } + stream.SendMsg(preparedMsg) + return nil +} + +func (s) TestPreloaderSenderSend(t *testing.T) { + for _, e := range listTestEnv() { + testPreloaderSenderSend(t, e) + } +} + +func testPreloaderSenderSend(t *testing.T, e env) { + te := newTest(t, e) + te.startServer(preparedMsgSendServer{}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + + req := &testpb.StreamingOutputCallRequest{} + stream, err := tc.StreamingOutputCall(context.Background(), req) + if err != nil { + t.Errorf("%v.StreamingOutputCall(_) = _, %v, want ", tc, err) + return + } + var ngot int + var buf bytes.Buffer + for { + reply, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + ngot++ + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.Write(reply.GetPayload().GetBody()) + } + if want := 1; ngot != want { + t.Errorf("Got %d replies, want %d", ngot, want) + } + if got, want := buf.String(), "0"; got != want { + t.Errorf("Got replies %q; want %q", got, want) + } +} + func (s) TestMaxMsgSizeClientDefault(t *testing.T) { for _, e := range listTestEnv() { testMaxMsgSizeClientDefault(t, e) From df26f5ffbb89eed71112f00dec7a7fafa63e5cfd Mon Sep 17 00:00:00 2001 From: Ehsan Afzali Date: Thu, 20 May 2021 17:57:45 +0300 Subject: [PATCH 3/3] use new test format --- test/end2end_test.go | 65 ++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index aba13a54ae5..6affe6697ef 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2248,50 +2248,37 @@ func testPreloaderClientSend(t *testing.T, e env) { } } -// preparedMsgSendServer is a TestServiceServer whose -// StreamingOutputCall makes a SendMsg calls using PreparedMsg, -// sending prepared messaged with payload "0". -// TestPreloaderSenderSend verifies it is being sent correctly. -// -// All other TestServiceServer methods crash if called. -type preparedMsgSendServer struct { - testpb.TestServiceServer -} - -func (s preparedMsgSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { - preparedMsg := &grpc.PreparedMsg{} - err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{ - Payload: &testpb.Payload{ - Body: []byte{'0'}, +func (s) TestPreloaderSenderSend(t *testing.T) { + ss := &stubserver.StubServer{ + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { + for i := 0; i < 10; i++ { + preparedMsg := &grpc.PreparedMsg{} + err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{ + Payload: &testpb.Payload{ + Body: []byte{'0' + uint8(i)}, + }, + }) + if err != nil { + return err + } + stream.SendMsg(preparedMsg) + } + return nil }, - }) - if err != nil { - return err } - stream.SendMsg(preparedMsg) - return nil -} - -func (s) TestPreloaderSenderSend(t *testing.T) { - for _, e := range listTestEnv() { - testPreloaderSenderSend(t, e) + if err := ss.Start(nil); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) } -} - -func testPreloaderSenderSend(t *testing.T, e env) { - te := newTest(t, e) - te.startServer(preparedMsgSendServer{}) - defer te.tearDown() + defer ss.Stop() - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() - req := &testpb.StreamingOutputCallRequest{} - stream, err := tc.StreamingOutputCall(context.Background(), req) + stream, err := ss.Client.FullDuplexCall(ctx) if err != nil { - t.Errorf("%v.StreamingOutputCall(_) = _, %v, want ", tc, err) - return + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err) } + var ngot int var buf bytes.Buffer for { @@ -2308,10 +2295,10 @@ func testPreloaderSenderSend(t *testing.T, e env) { } buf.Write(reply.GetPayload().GetBody()) } - if want := 1; ngot != want { + if want := 10; ngot != want { t.Errorf("Got %d replies, want %d", ngot, want) } - if got, want := buf.String(), "0"; got != want { + if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { t.Errorf("Got replies %q; want %q", got, want) } }