From 0439465fe2b4020767d9aab1bc3055e492c14089 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 7 May 2021 11:57:56 -0700 Subject: [PATCH] xds_resolver: fix flaky Test/XDSResolverDelayedOnCommitted (#4393) Before this change, if two xds client updates came too close together, the second one could replace the first one. The fix is to wait for the effects of the first update before sending the second update. I injected a synthetic delay into handling the updates from the channel to reproduce this flake 100%, and confirmed this change fixes it. As part of this change I also noticed that we're actually calling the context cancellation function twice via defers, and never the cancel function from the test setup, so I fixed that, too. --- xds/internal/resolver/xds_resolver_test.go | 85 +++++++++------------- 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 8ec29af9ebf..eca561f369c 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -289,10 +289,8 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -317,10 +315,8 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -407,7 +403,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { defer cancel() gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -475,7 +471,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -498,7 +494,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr) if _, err = tcc.stateCh.Receive(ctx); err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } // "Finish the RPC"; this could cause a panic if the resolver doesn't @@ -544,7 +540,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -577,7 +573,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr) if gotState, err = tcc.stateCh.Receive(ctx); err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -602,7 +598,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { // In the meantime, an empty ServiceConfig update should have been sent. if gotState, err = tcc.stateCh.Receive(ctx); err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -621,10 +617,8 @@ func (s) TestXDSResolverWRR(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -651,7 +645,7 @@ func (s) TestXDSResolverWRR(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -684,10 +678,8 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -722,7 +714,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -789,10 +781,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -813,7 +803,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -860,6 +850,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { }, }, }, nil) + tcc.stateCh.Receive(ctx) // Ignore the first update. + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { @@ -869,10 +861,9 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { }, }, nil) - tcc.stateCh.Receive(ctx) // Ignore the first update gotState, err = tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -910,7 +901,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { }, nil) gotState, err = tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -939,10 +930,8 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -971,7 +960,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { }, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { @@ -995,10 +984,8 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -1019,7 +1006,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { defer cancel() gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) wantParsedConfig := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)("{}") @@ -1043,10 +1030,8 @@ func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -1220,10 +1205,8 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { xdsR, tcc, cancel := testSetup(t, setupOpts{ xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, }) - defer func() { - cancel() - xdsR.Close() - }() + defer xdsR.Close() + defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -1265,7 +1248,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { gotState, err := tcc.stateCh.Receive(ctx) if err != nil { - t.Fatalf("ClientConn.UpdateState returned error: %v", err) + t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil {