Skip to content

Commit

Permalink
test/xds: wait for all ACKs before forcing stream restart (#5500)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jul 14, 2022
1 parent a094a10 commit 4f47c8c
Showing 1 changed file with 51 additions and 23 deletions.
74 changes: 51 additions & 23 deletions test/xds/xds_client_ack_nack_test.go
Expand Up @@ -39,14 +39,15 @@ import (
// xDS flow on the client.
const wantResources = 4

// seenAllACKs returns true if we have seen two streams with acks for all the
// resources that we are interested in.
func seenAllACKs(acks map[int64]map[string]string) bool {
if len(acks) != 2 {
// seenAllACKs returns true if the provided ackVersions map contains valid acks
// for all the resources that we are interested in. If `wantNonEmpty` is true,
// only non-empty ack versions are considered valid.
func seenAllACKs(acksVersions map[string]string, wantNonEmpty bool) bool {
if len(acksVersions) != wantResources {
return false
}
for _, v := range acks {
if len(v) != wantResources {
for _, ack := range acksVersions {
if wantNonEmpty && ack == "" {
return false
}
}
Expand All @@ -65,8 +66,19 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}
lis := testutils.NewRestartableListener(l)

streamClosed := grpcsync.NewEvent() // Event to notify stream closure.
acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources.
// We depend on the fact that the management server assigns monotonically
// increasing stream IDs starting at 1.
const (
idBeforeRestart = 1
idAfterRestart = 2
)

// Events of importance in the test, in the order in which they are expected
// to happen.
acksReceivedBeforeRestart := grpcsync.NewEvent()
streamRestarted := grpcsync.NewEvent()
acksReceivedAfterRestart := grpcsync.NewEvent()

// Map from stream id to a map of resource type to resource version.
ackVersionsMap := make(map[int64]map[string]string)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Expand All @@ -78,7 +90,7 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
// - Request contains no resource names. Such requests are usually
// seen when the xdsclient is shutting down and is no longer
// interested in the resources that it had subscribed to earlier.
if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 {
if acksReceivedAfterRestart.HasFired() || len(req.GetResourceNames()) == 0 {
return nil
}
// Create a stream specific map to store ack versions if this is the
Expand All @@ -87,13 +99,25 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
ackVersionsMap[id] = make(map[string]string)
}
ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
if seenAllACKs(ackVersionsMap) {
acksReceived.Fire()
// Prior to stream restart, we are interested only in non-empty
// resource versions. The xdsclient first sends out requests with an
// empty version string. After receipt of requested resource, it
// sends out another request for the same resource, but this time
// with a non-empty version string, to serve as an ACK.
if seenAllACKs(ackVersionsMap[idBeforeRestart], true) {
acksReceivedBeforeRestart.Fire()
}
// After stream restart, we expect the xdsclient to send out
// requests with version string set to the previously ACKed
// versions. If it sends out requests with empty version string, it
// is a bug and we want this test to catch it.
if seenAllACKs(ackVersionsMap[idAfterRestart], false) {
acksReceivedAfterRestart.Fire()
}
return nil
},
OnStreamClosed: func(int64) {
streamClosed.Fire()
streamRestarted.Fire()
},
})
defer cleanup1()
Expand Down Expand Up @@ -127,30 +151,34 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// A successful RPC means that we have captured the ack versions for all
// resources in the OnStreamRequest callback. Nothing more needs to be done
// here before stream restart.
// A successful RPC means that the xdsclient received all requested
// resources. The ACKs from the xdsclient may get a little delayed. So, we
// need to wait for all ACKs to be received on the management server before
// restarting the stream.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for all resources to be ACKed prior to stream restart")
case <-acksReceivedBeforeRestart.Done():
}

// Stop the listener on the management server. This will cause the client to
// backoff and recreate the stream.
lis.Stop()

// Wait for the stream to be closed on the server.
<-streamClosed.Done()
<-streamRestarted.Done()

// Restart the listener on the management server to be able to accept
// reconnect attempts from the client.
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
<-acksReceived.Done()
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for all resources to be ACKed post stream restart")
case <-acksReceivedAfterRestart.Done():
}

// We depend on the fact that the management server assigns monotonically
// increasing stream IDs starting at 1.
const (
idBeforeRestart = 1
idAfterRestart = 2
)
if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" {
t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff)
}
Expand Down

0 comments on commit 4f47c8c

Please sign in to comment.