Skip to content

Commit

Permalink
make test use stubserver
Browse files Browse the repository at this point in the history
  • Loading branch information
apolcyn committed Apr 14, 2021
1 parent 052d632 commit eef6755
Showing 1 changed file with 35 additions and 58 deletions.
93 changes: 35 additions & 58 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,99 +1383,76 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
}

func (s) TestDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
// TODO(apolcyn): the server doesn't terminate the connection due
// to max connection age under the handler-tls configuration, is
// this WAI?
continue
}
testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t, e)
rpcDoneOnClient := make(chan struct{})
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
},
}
}

func testDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.customServerOptions = []grpc.ServerOption{
sopts := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: time.Millisecond * 100,
MaxConnectionAgeGrace: time.Millisecond,
}),
}
rpcDoneOnClient := make(chan struct{})
te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
if err := ss.Start(sopts); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
te.startServer(&testServer{security: e.security})
defer te.tearDown()
defer ss.Stop()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := tc.FullDuplexCall(ctx)
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", tc, err)
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
}
expectedErrorMessageSubstring := "received prior goaway: code: NO_ERROR"
const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
_, err = stream.Recv()
close(rpcDoneOnClient)
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
}
}

func (s) TestDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
// first, lower the min keepalive time so that we can generate
// server-side ping abuse logic quickly
// set the min keepalive time very low so that this test can take
// a reasonable amount of time
prev := internal.KeepaliveMinPingTime
internal.KeepaliveMinPingTime = time.Millisecond
defer func() { internal.KeepaliveMinPingTime = prev }()
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
// TODO(apolcyn): the server doesn't terminate the connection due
// to aggressive keepalives under the handler-tls configuration, is
// this WAI?
continue
}
testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t, e)
}
}

func testDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.customDialOptions = []grpc.DialOption{
rpcDoneOnClient := make(chan struct{})
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
},
}
sopts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Second * 1000, /* arbitrary, large value */
}),
}
dopts := []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Millisecond, /* should trigger "too many pings" error quickly */
Timeout: time.Second * 1000, /* arbitrarym, arge value */
PermitWithoutStream: false,
}),
}
te.customServerOptions = []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Second * 1000, /* arbitrary, large value */
}),
}
rpcDoneOnClient := make(chan struct{})
te.streamServerInt = func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
if err := ss.Start(sopts, dopts...); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
te.startServer(&testServer{security: e.security})
defer te.tearDown()
defer ss.Stop()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := tc.FullDuplexCall(ctx)
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", tc, err)
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
}
expectedErrorMessageSubstring := "received prior goaway: code: ENHANCE_YOUR_CALM, debug data: too_many_pings"
const expectedErrorMessageSubstring = "received prior goaway: code: ENHANCE_YOUR_CALM, debug data: too_many_pings"
_, err = stream.Recv()
close(rpcDoneOnClient)
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
Expand Down

0 comments on commit eef6755

Please sign in to comment.