Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds_resolver: fix flaky Test/XDSResolverDelayedOnCommitted #4393

Merged
merged 3 commits into from May 7, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 34 additions & 51 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -289,10 +289,8 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -317,10 +315,8 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -407,7 +403,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -475,7 +471,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -498,7 +494,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}

// "Finish the RPC"; this could cause a panic if the resolver doesn't
Expand Down Expand Up @@ -544,7 +540,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -577,7 +573,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)

if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -602,7 +598,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {

// In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -621,10 +617,8 @@ func (s) TestXDSResolverWRR(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -651,7 +645,7 @@ func (s) TestXDSResolverWRR(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -684,10 +678,8 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -722,7 +714,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -789,10 +781,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -813,7 +803,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -860,6 +850,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
},
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move comment to above line of code (Easwar mentioned he doesn't like it beside.) Add period to comment. Why do we need to invoke the callback twice? None of the "handling" logic (inline instead of explicit functions like in cds) in xds resolver run() leads me to believe we need to invoke it twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move comment to above line of code (Easwar mentioned he doesn't like it beside.)

This is pretty common practice in my experience. @easwars - is there some justification behind what otherwise sounds like personal preference? (FWIW I urge everyone to push back when someone - including me - suggests something in a review that sounds subjective; there may be objective benefits you aren't seeing, but not necessarily. And if there are objective benefits but you are not understanding them, then everyone loses.)

Add period to comment.

I'll give you this one. :) The rule I've always heard and followed is: comments that are not sentences are fine (count int // the number of things), but then you don't capitalize the first letter or end with a period. Putting a period at the end of a sentence fragment is not proper. This one turns out to (technically) be a legal complete sentence due to the implied "you" as a command, though I didn't think of it that way initially.

Why do we need to invoke the callback twice?

In first update after any cluster removal, the previous config selector is still live and references the cluster, meaning it needs to remain live in the system. With the second update, any otherwise-unreferenced clusters may be removed. In this case, the initially-selected cluster is still live, since we haven't "finished" the "RPC" yet, so it should not be removed from the list of live clusters; this test confirms that.

There is a comment: "Perform TWO updates to ensure the old config selector does not hold a reference to test-cluster-1" - LMK if this isn't clear enough given this explanation of how config selectors & cluster removal works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easwar is the community contributor about go style so I took his word as gospel, personally I don't care: this comment didn't get put on top on my aggregate clusters pr: https://github.com/grpc/grpc-go/pull/4332/files#diff-0688175c92dab05730c4e7d801fc6fdcca663588fdaa63286f1323cd5bc86da9R156. Fair enough regarding your explanation. That question mainly arose out of the "newness" of my exposure to this section, I was pretty sure it was right but wanted to make sure. Perhaps you could add your explanation to me to the comment in the test, but I'll approve it and let the TL decide.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could add your explanation to me to the comment in the test

I feel like the comment that is already there is sufficient given an understanding of how the config selector / cluster stuff works at a semi-high level. This is actually discussed in the design of the feature -- which I know you didn't read, and I wouldn't expect you to for one small PR. That kind of understanding should be something test writers can assume anyone reading the test already has.

If you think something is not clear given "reasonable" background (here or in a future PR), then suggesting exactly how to add a comment or change an existing one can be helpful.

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move comment to above line of code (Easwar mentioned he doesn't like it beside.)

Wasn't able to find where I said I don't like comments which are beside the line of code. I might have asked you to terminate the comment with a punctuation.

Easwar is the community contributor about go style so I took his word as gospel

Lol ... you don't have to. You could have a slightly different interpretation of the style guide and I'm also allowed to be wrong from time to time :)


xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -869,10 +861,9 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
},
}, nil)

tcc.stateCh.Receive(ctx) // Ignore the first update
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -910,7 +901,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down Expand Up @@ -939,10 +930,8 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -971,7 +960,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand All @@ -995,10 +984,8 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -1019,7 +1006,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
wantParsedConfig := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)("{}")
Expand All @@ -1043,10 +1030,8 @@ func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1220,10 +1205,8 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand Down Expand Up @@ -1265,7 +1248,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
Expand Down