Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: don't reset version info after stream restart #5422

Merged
merged 7 commits into from Jul 6, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 55 additions & 72 deletions test/xds/xds_client_ack_nack_test.go
Expand Up @@ -35,18 +35,22 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)

const (
// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
wantResources = 4
// Management server assigns monotonically increasing stream IDs starting at 1.
idBeforeRestart = 1
idAfterRestart = 2
)

type streamRequestTuple struct {
id int64 // Assigned by the management server.
req *v3discoverypb.DiscoveryRequest // As received by the managerment server.
// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
const wantResources = 4

// seenAllACKs returns true if we have seen two streams with acks for all the
// resources that we are interested in.
func seenAllACKs(acks map[int64]map[string]string) bool {
if len(acks) != 2 {
return false
}
for _, v := range acks {
if len(v) != wantResources {
return false
}
}
return true
}

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
Expand All @@ -61,18 +65,31 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}
lis := testutils.NewRestartableListener(l)

// Event to notify stream closure.
streamClosed := grpcsync.NewEvent()
// ADS requests received by the management server are pushed on to this
// channel from the below registered callbacks. A big buffer is configured
// on the channel to accommodate for the fact that at least two requests are
// required for every resource (the initial one and the ACK), and because it
// allows us to have simpler code.
requestsCh := make(chan streamRequestTuple, 50)
streamClosed := grpcsync.NewEvent() // Event to notify stream closure.
acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources.
// Map from stream id to a map of resource type to resource version.
ackVersionsMap := make(map[int64]map[string]string)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
requestsCh <- streamRequestTuple{id: id, req: req}
// Return early under the following circumstances:
// - Received all the requests we wanted to see. This is to avoid
// any stray requests leading to test flakes.
// - Request contains no resource names. Such requests are usually
// seen when the xdsclient is shutting down and is no longer
// interested in the resources that it had subscribed to earlier.
if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 {
return nil
}
// Create a stream specific map to store ack versions if this is the
// first time we are seeing this stream id.
if ackVersionsMap[id] == nil {
ackVersionsMap[id] = make(map[string]string)
}
ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
if seenAllACKs(ackVersionsMap) {
acksReceived.Fire()
}
return nil
},
OnStreamClosed: func(int64) {
Expand Down Expand Up @@ -110,30 +127,9 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Wait for all the resources to be ACKed.
ackVersionsBeforeRestart := make(map[string]string)
AcksBeforeRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id != idBeforeRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idBeforeRestart)
}
// The client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
req := tuple.req
if len(req.GetResourceNames()) != 0 && req.GetVersionInfo() != "" {
ackVersionsBeforeRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsBeforeRestart) == wantResources {
break AcksBeforeRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be requested before stream restart")
}
}
// A successful RPC means that we have captured the ack versions for all
// resources in the OnStreamRequest callback. Nothing more needs to be done
// here before stream restart.

// Stop the listener on the management server. This will cause the client to
// backoff and recreate the stream.
Expand All @@ -147,35 +143,22 @@ AcksBeforeRestart:
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
ackVersionsAfterRestart := make(map[string]string)
AcksAfterRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id == idBeforeRestart {
// Ignore any stray requests from the old stream.
continue
}
if tuple.id != idAfterRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart)
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
req := tuple.req
if len(req.GetResourceNames()) != 0 {
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
break AcksAfterRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be re-requested after stream restart")
<-acksReceived.Done()

// Retrieve and compare ACKs from before and after stream restart.
var idBeforeRestart, idAfterRestart int64
for id := range ackVersionsMap {
if id > idAfterRestart {
idBeforeRestart = idAfterRestart
idAfterRestart = id
} else {
idBeforeRestart = id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this needs to be so complicated now. Why can't we use 1 and 2 directly anymore as before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved to having a map indexed by the stream id, I thought this should be the way to go. Over engineering .. sigh.

Done now. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Go had better generics support already, I wouldn't mind it so much as:

idBeforeRestart := min(keys(ackVersionsMap))
idAfterRestart := max(keys(ackVersionsMap))

}
}

if !cmp.Equal(ackVersionsBeforeRestart, ackVersionsAfterRestart) {
t.Fatalf("ackVersionsBeforeRestart: %v and ackVersionsAfterRestart: %v don't match", ackVersionsBeforeRestart, ackVersionsAfterRestart)
acksBeforeRestart := ackVersionsMap[idBeforeRestart]
acksAfterRestart := ackVersionsMap[idAfterRestart]
if !cmp.Equal(acksBeforeRestart, acksAfterRestart) {
t.Fatalf("ACKs before restart: %v and ACKs after restart: %v don't match", acksBeforeRestart, acksAfterRestart)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cmp.Diff instead of cmp.Equal and display that output instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
Expand Down