Skip to content

Commit

Permalink
simplify management server callback functionality in test
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jun 29, 2022
1 parent bee2f6f commit d593272
Showing 1 changed file with 66 additions and 62 deletions.
128 changes: 66 additions & 62 deletions test/xds/xds_client_ack_nack_test.go
Expand Up @@ -20,7 +20,6 @@ package xds_test

import (
"context"
"errors"
"fmt"
"testing"

Expand All @@ -36,31 +35,18 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)

// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
const wantResources = 4

// resourceVersionTuple holds the resource's type URL and version string.
type resourceVersionTuple struct {
typeURL string
version string
}
const (
// We are interested in LDS, RDS, CDS and EDS resources as part of the regular
// xDS flow on the client.
wantResources = 4
// Management server assigns monotonically increasing stream IDs starting at 1.
idBeforeRestart = 1
idAfterRestart = 2
)

// readResourceVersions builds a map from resource's type URL to the version
// string, by reading resourceVersionTuples pushed on to the provided channel.
func readResourceVersions(ctx context.Context, ch chan resourceVersionTuple) (map[string]string, error) {
versionsMap := make(map[string]string)
for {
select {
case tuple := <-ch:
versionsMap[tuple.typeURL] = tuple.version
if len(versionsMap) == wantResources {
return versionsMap, nil
}
case <-ctx.Done():
return nil, errors.New("timeout when waiting for all resources to be re-requested after stream restart")
}
}
type streamRequestTuple struct {
id int64 // Assigned by the management server.
req *v3discoverypb.DiscoveryRequest // As received by the managerment server.
}

// TestClientResourceVersionAfterStreamRestart tests the scenario where the
Expand All @@ -75,41 +61,18 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}
lis := testutils.NewRestartableListener(l)

// Channels to push resource versions requested by the xdsClient before and
// after stream restart.
resourcesRequestedBeforeStreamClose := make(chan resourceVersionTuple, wantResources)
resourcesRequestedAfterStreamClose := make(chan resourceVersionTuple, wantResources)
// Event to notify stream closure.
streamClosed := grpcsync.NewEvent()

// ADS requests received by the management server are pushed on to this
// channel from the below registered callbacks. A big buffer is configured
// on the channel to accommodate for the fact that at least two requests are
// required for every resource (the initial one and the ACK), and because it
// allows us to have simpler code.
requestsCh := make(chan streamRequestTuple, 50)
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(id int64, request *v3discoverypb.DiscoveryRequest) error {
// Populate the versions in the appropriate map based on whether the
// stream has closed.
if !streamClosed.HasFired() {
// Prior to stream closure, record only non-empty version numbers. The
// client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
if len(request.GetResourceNames()) != 0 && request.GetVersionInfo() != "" {
resourcesRequestedBeforeStreamClose <- resourceVersionTuple{
typeURL: request.GetTypeUrl(),
version: request.GetVersionInfo(),
}
}
return nil
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
if len(request.GetResourceNames()) != 0 {
resourcesRequestedAfterStreamClose <- resourceVersionTuple{
typeURL: request.GetTypeUrl(),
version: request.GetVersionInfo(),
}
}
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
requestsCh <- streamRequestTuple{id: id, req: req}
return nil
},
OnStreamClosed: func(int64) {
Expand Down Expand Up @@ -148,9 +111,28 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
}

// Wait for all the resources to be ACKed.
ackVersionsBeforeRestart, err := readResourceVersions(ctx, resourcesRequestedBeforeStreamClose)
if err != nil {
t.Fatal(err)
ackVersionsBeforeRestart := make(map[string]string)
AcksBeforeRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id != idBeforeRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idBeforeRestart)
}
// The client first requests for a resource with version set to empty
// string. After receipt of the response, it sends another request for
// the same resource, this time with a non-empty version string. This
// corresponds to ACKs, and this is what we want to capture.
req := tuple.req
if len(req.GetResourceNames()) != 0 && req.GetVersionInfo() != "" {
ackVersionsBeforeRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsBeforeRestart) == wantResources {
break AcksBeforeRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be requested before stream restart")
}
}

// Stop the listener on the management server. This will cause the client to
Expand All @@ -165,9 +147,31 @@ func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
lis.Restart()

// Wait for all the previously sent resources to be re-requested.
ackVersionsAfterRestart, err := readResourceVersions(ctx, resourcesRequestedAfterStreamClose)
if err != nil {
t.Fatal(err)
ackVersionsAfterRestart := make(map[string]string)
AcksAfterRestart:
for {
select {
case tuple := <-requestsCh:
if tuple.id == idBeforeRestart {
// Ignore any stray requests from the old stream.
continue
}
if tuple.id != idAfterRestart {
t.Fatalf("Received request with stream ID %d, expecting %d", tuple.id, idAfterRestart)
}
// After stream closure, capture the first request for every resource.
// This should not be set to an empty version string, but instead should
// be set to the version last ACKed before stream closure.
req := tuple.req
if len(req.GetResourceNames()) != 0 {
ackVersionsAfterRestart[req.GetTypeUrl()] = req.GetVersionInfo()
}
if len(ackVersionsAfterRestart) == wantResources {
break AcksAfterRestart
}
case <-ctx.Done():
t.Fatal("timeout when waiting for resources to be re-requested after stream restart")
}
}

if !cmp.Equal(ackVersionsBeforeRestart, ackVersionsAfterRestart) {
Expand Down

0 comments on commit d593272

Please sign in to comment.