Skip to content

Commit

Permalink
xds_resolver: fix flaky Test/XDSResolverDelayedOnCommitted (#4393)
Browse files Browse the repository at this point in the history
Before this change, if two xds client updates came too close together, the second one could replace the first one.  The fix is to wait for the effects of the first update before sending the second update.  I injected a synthetic delay into handling the updates from the channel to reproduce this flake 100%, and confirmed this change fixes it.

As part of this change I also noticed that we're actually calling the context cancellation function twice via defers, and never the cancel function from the test setup, so I fixed that, too.
  • Loading branch information
dfawley committed May 7, 2021
1 parent 0ab423a commit 0439465
Showing 1 changed file with 34 additions and 51 deletions.
85 changes: 34 additions & 51 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -289,10 +289,8 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -317,10 +315,8 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -407,7 +403,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -475,7 +471,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -498,7 +494,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
Expand Down Expand Up @@ -544,7 +540,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -577,7 +573,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -602,7 +598,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

// In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -621,10 +617,8 @@ func (s) TestXDSResolverWRR(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -651,7 +645,7 @@ func (s) TestXDSResolverWRR(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -684,10 +678,8 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -722,7 +714,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -789,10 +781,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -813,7 +803,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -860,6 +850,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
},
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update.

xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -869,10 +861,9 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
}, nil)

tcc.stateCh.Receive(ctx) // Ignore the first update
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -910,7 +901,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -939,10 +930,8 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -971,7 +960,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -995,10 +984,8 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -1019,7 +1006,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
wantParsedConfig := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)("{}")
Expand All @@ -1043,10 +1030,8 @@ func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1220,10 +1205,8 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1265,7 +1248,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down

0 comments on commit 0439465

Please sign in to comment.