Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: don't reset version info after stream restart #5422

Merged
merged 7 commits into from Jul 6, 2022

Conversation

easwars
Copy link
Contributor

@easwars easwars commented Jun 14, 2022

When resending resource names after an ADS stream failure, we were resetting the resource version information received on the previous stream. This was our behavior from day 1, but this was in conflict with that is specified in the spec. This change brings the Go implementation to be in accordance with the spec.

Summary of changes:

  • Fix the code in xdsClient to not reset version information about ADS stream restart
    • Also, fix the code to not send empty version strings upon ADS stream restart. Instead, use the previously received version strings.
      • The actual code changes are limited to xdsclient/controller/controller.go and xdsclient/controller/transport/go. The remaining changes are test related.
  • Add a new e2e test to verify that version information is preserved across stream restarts.
  • Add a type to specify options to the ManagementServer type defined in xds/e2e package.
    • The options accept a set of callbacks to be invoked for stream level events.
    • It also accepts a net.Listener for tests which need to customize the listener.
  • Modify the utility functions StartManagementServer and SetupManagementServer to accept these options.
  • Move management server logging functions to a separate file.

Fixes #5351

RELEASE NOTES:

  • xdsClient will not reset resource version information after ADS stream restart

@easwars easwars requested a review from dfawley June 14, 2022 21:17
@easwars easwars added this to the 1.48 Release milestone Jun 14, 2022
@easwars easwars force-pushed the management_server_callbacks branch from 7ebf9e4 to 65a605f Compare June 14, 2022 22:36
// corresponds to ACKs, and this is what we want to capture.
if len(request.GetResourceNames()) != 0 && request.GetVersionInfo() != "" {
ackVersionsBeforeRestart[request.GetTypeUrl()] = request.GetVersionInfo()
if len(ackVersionsBeforeRestart) == wantResources {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an else here so it fails if we go over the expectation?

Another option perhaps: maybe push the resources to the channel instead and validate them back in the main test code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed resources into a channel and changed to validate them from the main test goroutine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed further test simplification ideas offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

if ackVersionsAfterRestart[request.GetTypeUrl()] == "" {
ackVersionsAfterRestart[request.GetTypeUrl()] = request.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 75 to 78
select {
case resourcesRequestedBeforeStreamClose <- struct{}{}:
default:
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just push to the channel normally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required anymore.

Comment on lines 53 to 56
// Channels to notify that all expected resources have been requested by the
// xdsClient before and after stream restart.
resourcesRequestedBeforeStreamClose := make(chan struct{}, 1)
resourcesRequestedAfterStreamClose := make(chan struct{}, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these just be normal, unbuffered channels, that are closed when the condition happens? Or use a grpcsync.Event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, these channels are buffered to have enough space for 4 resources.

@dfawley dfawley assigned easwars and unassigned dfawley Jun 17, 2022
@easwars easwars assigned dfawley and unassigned easwars Jun 22, 2022
@dfawley dfawley assigned easwars and unassigned dfawley Jun 28, 2022
@easwars easwars assigned dfawley and unassigned easwars Jun 29, 2022
@easwars easwars force-pushed the management_server_callbacks branch from d593272 to e6adc7b Compare June 29, 2022 23:10
Comment on lines 150 to 175
ackVersionsAfterRestart := make(map[string]string)
AcksAfterRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id == idBeforeRestart {
// Ignore any stray requests from the old stream.
continue
}
if tuple.id != idAfterRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart)
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
req := tuple.req
if len(req.GetResourceNames()) != 0 {
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
break AcksAfterRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be re-requested after stream restart")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in the loop above? I.e. store the first wantResources requests for each stream ID. Could either be [<id int>]map[string]string or map[<id int>]map[string]string, or keep it as two different maps.

You could potentially go even further: get rid of the channel entirely, accumulate the maps directly in the callback instead, and close a channel / fire an event / cancel a context (can have the timeout on it) when the maps have been populated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved the logic to populate the map into the callback. The test goroutine now waits for a signal from the callback to go ahead and verify the acks before and after stream restart.

The logic to extract and compare the acks from the populated map seems a little verbose. Not sure if I can do any better though. Thanks.

@dfawley dfawley assigned easwars and unassigned dfawley Jun 29, 2022
@easwars easwars assigned dfawley and unassigned easwars Jun 30, 2022
acksBeforeRestart := ackVersionsMap[idBeforeRestart]
acksAfterRestart := ackVersionsMap[idAfterRestart]
if !cmp.Equal(acksBeforeRestart, acksAfterRestart) {
t.Fatalf("ACKs before restart: %v and ACKs after restart: %v don't match", acksBeforeRestart, acksAfterRestart)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cmp.Diff instead of cmp.Equal and display that output instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 149 to 155
var idBeforeRestart, idAfterRestart int64
for id := range ackVersionsMap {
if id > idAfterRestart {
idBeforeRestart = idAfterRestart
idAfterRestart = id
} else {
idBeforeRestart = id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this needs to be so complicated now. Why can't we use 1 and 2 directly anymore as before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved to having a map indexed by the stream id, I thought this should be the way to go. Over engineering .. sigh.

Done now. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Go had better generics support already, I wouldn't mind it so much as:

idBeforeRestart := min(keys(ackVersionsMap))
idAfterRestart := max(keys(ackVersionsMap))

@dfawley dfawley assigned easwars and unassigned dfawley Jun 30, 2022
@easwars easwars assigned dfawley and unassigned easwars Jun 30, 2022
@dfawley dfawley assigned easwars and unassigned dfawley Jul 1, 2022
@dfawley dfawley modified the milestones: 1.48 Release, 1.49 Release Jul 1, 2022
@easwars easwars merged commit a6dcb71 into grpc:master Jul 6, 2022
@easwars easwars deleted the management_server_callbacks branch July 15, 2022 17:24
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jan 14, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

xds: use previously acked version when requesting resources after stream failure
2 participants