Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: don't reset version info after stream restart #5422

Merged
merged 7 commits into from Jul 6, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 66 additions & 62 deletions test/xds/xds_client_ack_nack_test.go
Expand Up @@ -20,7 +20,6 @@ package xds_test

import (
"context"
"errors"
"fmt"
"testing"

Expand All @@ -36,31 +35,18 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)

// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
const wantResources = 4

// resourceVersionTuple holds the resource's type URL and version string.
type resourceVersionTuple struct {
typeURL string
version string
}
const (
// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
wantResources = 4
// Management server assigns monotonically increasing stream IDs starting at 1.
idBeforeRestart = 1
idAfterRestart = 2
)

// readResourceVersions builds a map from resource's type URL to the version
// string, by reading resourceVersionTuples pushed on to the provided channel.
func readResourceVersions(ctx context.Context, ch chan resourceVersionTuple) (map[string]string, error) {
versionsMap := make(map[string]string)
for {
select {
case tuple := <-ch:
versionsMap[tuple.typeURL] = tuple.version
if len(versionsMap) == wantResources {
return versionsMap, nil
}
case <-ctx.Done():
return nil, errors.New("timeout when waiting for all resources to be re-requested after stream restart")
}
}
type streamRequestTuple struct {
id int64 // Assigned by the management server.
req *v3discoverypb.DiscoveryRequest // As received by the managerment server.
}

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
Expand All @@ -75,41 +61,18 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}
lis := testutils.NewRestartableListener(l)

// Channels to push resource versions requested by the xdsClient before and
// after stream restart.
resourcesRequestedBeforeStreamClose := make(chan resourceVersionTuple, wantResources)
resourcesRequestedAfterStreamClose := make(chan resourceVersionTuple, wantResources)
// Event to notify stream closure.
streamClosed := grpcsync.NewEvent()

// ADS requests received by the management server are pushed on to this
// channel from the below registered callbacks. A big buffer is configured
// on the channel to accommodate for the fact that at least two requests are
// required for every resource (the initial one and the ACK), and because it
// allows us to have simpler code.
requestsCh := make(chan streamRequestTuple, 50)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, request *v3discoverypb.DiscoveryRequest) error {
// Populate the versions in the appropriate map based on whether the
// stream has closed.
if !streamClosed.HasFired() {
// Prior to stream closure, record only non-empty version numbers. The
// client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
if len(request.GetResourceNames()) != 0 && request.GetVersionInfo() != "" {
resourcesRequestedBeforeStreamClose <- resourceVersionTuple{
typeURL: request.GetTypeUrl(),
version: request.GetVersionInfo(),
}
}
return nil
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
if len(request.GetResourceNames()) != 0 {
resourcesRequestedAfterStreamClose <- resourceVersionTuple{
typeURL: request.GetTypeUrl(),
version: request.GetVersionInfo(),
}
}
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
requestsCh <- streamRequestTuple{id: id, req: req}
return nil
},
OnStreamClosed: func(int64) {
Expand Down Expand Up @@ -148,9 +111,28 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}

// Wait for all the resources to be ACKed.
ackVersionsBeforeRestart, err := readResourceVersions(ctx, resourcesRequestedBeforeStreamClose)
if err != nil {
t.Fatal(err)
ackVersionsBeforeRestart := make(map[string]string)
AcksBeforeRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id != idBeforeRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idBeforeRestart)
}
// The client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
req := tuple.req
if len(req.GetResourceNames()) != 0 && req.GetVersionInfo() != "" {
ackVersionsBeforeRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsBeforeRestart) == wantResources {
break AcksBeforeRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be requested before stream restart")
}
}

// Stop the listener on the management server. This will cause the client to
Expand All @@ -165,9 +147,31 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
ackVersionsAfterRestart, err := readResourceVersions(ctx, resourcesRequestedAfterStreamClose)
if err != nil {
t.Fatal(err)
ackVersionsAfterRestart := make(map[string]string)
AcksAfterRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id == idBeforeRestart {
// Ignore any stray requests from the old stream.
continue
}
if tuple.id != idAfterRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart)
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
req := tuple.req
if len(req.GetResourceNames()) != 0 {
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
break AcksAfterRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be re-requested after stream restart")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in the loop above? I.e. store the first wantResources requests for each stream ID. Could either be [<id int>]map[string]string or map[<id int>]map[string]string, or keep it as two different maps.

You could potentially go even further: get rid of the channel entirely, accumulate the maps directly in the callback instead, and close a channel / fire an event / cancel a context (can have the timeout on it) when the maps have been populated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Moved the logic to populate the map into the callback. The test goroutine now waits for a signal from the callback to go ahead and verify the acks before and after stream restart.

The logic to extract and compare the acks from the populated map seems a little verbose. Not sure if I can do any better though. Thanks.


if !cmp.Equal(ackVersionsBeforeRestart, ackVersionsAfterRestart) {
Expand Down