Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds_resolver: fix flaky Test/XDSResolverDelayedOnCommitted #4393

Merged
merged 3 commits into from May 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 34 additions & 51 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -289,10 +289,8 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -317,10 +315,8 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -407,7 +403,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -475,7 +471,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -498,7 +494,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
Expand Down Expand Up @@ -544,7 +540,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -577,7 +573,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -602,7 +598,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

// In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -621,10 +617,8 @@ func (s) TestXDSResolverWRR(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -651,7 +645,7 @@ func (s) TestXDSResolverWRR(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -684,10 +678,8 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -722,7 +714,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -789,10 +781,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -813,7 +803,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -860,6 +850,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
},
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update.

xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -869,10 +861,9 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
}, nil)

tcc.stateCh.Receive(ctx) // Ignore the first update
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -910,7 +901,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -939,10 +930,8 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -971,7 +960,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -995,10 +984,8 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -1019,7 +1006,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
wantParsedConfig := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)("{}")
Expand All @@ -1043,10 +1030,8 @@ func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1220,10 +1205,8 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1265,7 +1248,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down