Skip to content

Commit

Permalink
xdsclient: improve LDS watchers test (#5691)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Oct 11, 2022
1 parent 7b817b4 commit e81d0a2
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 43 deletions.
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/client_new.go
Expand Up @@ -75,8 +75,8 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i
// Testing Only
//
// This function should ONLY be used for testing purposes.
func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.Duration) (XDSClient, error) {
cl, err := newWithConfig(config, watchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, authorityIdleTimeout time.Duration) (XDSClient, error) {
cl, err := newWithConfig(config, watchExpiryTimeout, authorityIdleTimeout)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/xdsclient/dump_test.go
Expand Up @@ -78,7 +78,7 @@ func (s) TestLDSConfigDump(t *testing.T) {
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
NodeProto: xdstestutils.EmptyNodeProtoV2,
},
}, defaultTestWatchExpiryTimeout)
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s) TestRDSConfigDump(t *testing.T) {
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
NodeProto: xdstestutils.EmptyNodeProtoV2,
},
}, defaultTestWatchExpiryTimeout)
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s) TestCDSConfigDump(t *testing.T) {
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
NodeProto: xdstestutils.EmptyNodeProtoV2,
},
}, defaultTestWatchExpiryTimeout)
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func (s) TestEDSConfigDump(t *testing.T) {
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
NodeProto: xdstestutils.EmptyNodeProtoV2,
},
}, defaultTestWatchExpiryTimeout)
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
Expand Down
187 changes: 151 additions & 36 deletions xds/internal/xdsclient/e2e_test/lds_watchers_test.go
Expand Up @@ -25,25 +25,30 @@ import (
"testing"
"time"

"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"

_ "google.golang.org/grpc/xds" // To ensure internal.NewXDSResolverWithConfigForTesting is set.
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client.
_ "google.golang.org/grpc/xds" // To ensure internal.NewXDSResolverWithConfigForTesting is set.
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter.
)

func overrideFedEnvVar(t *testing.T) {
Expand All @@ -61,14 +66,15 @@ func Test(t *testing.T) {
}

const (
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
defaultTestIdleAuthorityTimeout = 50 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.

ldsName = "xdsclient-test-lds-resource"
rdsName = "xdsclient-test-rds-resource"
ldsNameNewStyle = "xdstp:///envoy.config.listener.v3.Listener/xdsclient-test-lds-resource"
rdsNameNewStyle = "xdstp:///envoy.config.listener.v3.Listener/xdsclient-test-rds-resource"
rdsNameNewStyle = "xdstp:///envoy.config.route.v3.RouteConfiguration/xdsclient-test-rds-resource"
)

// badListenerResource returns a listener resource for the given name which does
Expand All @@ -93,7 +99,7 @@ func badListenerResource(name string) *v3listenerpb.Listener {

// xdsClient is expected to produce an error containing this string when an
// update is received containing a listener created using `badListenerResource`.
const wantNACKErr = "no RouteSpecifier"
const wantListenerNACKErr = "no RouteSpecifier"

// verifyNoListenerUpdate verifies that no listener update is received on the
// provided update channel, and returns an error if an update is received.
Expand Down Expand Up @@ -148,17 +154,6 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
//
// The test is run for old and new style names.
func (s) TestLDSWatch(t *testing.T) {
overrideFedEnvVar(t)
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil)
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

tests := []struct {
desc string
resourceName string
Expand Down Expand Up @@ -197,6 +192,17 @@ func (s) TestLDSWatch(t *testing.T) {

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
overrideFedEnvVar(t)
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil)
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
Expand Down Expand Up @@ -268,17 +274,6 @@ func (s) TestLDSWatch(t *testing.T) {
//
// The test is run for old and new style names.
func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
overrideFedEnvVar(t)
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil)
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

tests := []struct {
desc string
resourceName string
Expand Down Expand Up @@ -327,6 +322,17 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
overrideFedEnvVar(t)
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil)
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

// Register two watches for the same listener resource and have the
// callbacks push the received updates on to a channel.
updateCh1 := testutils.NewChannel()
Expand Down Expand Up @@ -562,6 +568,115 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
}
}

// TestLDSWatch_ExpiryTimerFiresBeforeResponse tests the case where the client
// does not receive an LDS response for the request that it sends. The test
// verifies that the watch callback is invoked with an error once the
// watchExpiryTimer fires.
func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// No need to spin up a management server since we don't want the client to
// receive a response for the watch being registered by the test.

// Create an xDS client talking to a non-existent management server.
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy management server address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
NodeProto: &v3corepb.Node{},
},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()

// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
defer ldsCancel()

// Wait for the watch expiry timer to fire.
<-time.After(defaultTestWatchExpiryTimeout)

// Verify that an empty update with the expected error is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := fmt.Errorf("watch for resource %q of type Listener timed out", ldsName)
if err := verifyListenerUpdate(ctx, updateCh, xdsresource.ListenerUpdateErrTuple{Err: wantErr}); err != nil {
t.Fatal(err)
}
}

// TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior tests the case where the
// client receives a valid LDS response for the request that it sends. The test
// verifies that the behavior associated with the expiry timer (i.e, callback
// invocation with error) does not take place.
func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
overrideFedEnvVar(t)
mgmtServer, err := e2e.StartManagementServer(nil)
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV3,
NodeProto: &v3corepb.Node{Id: nodeID},
},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()

// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
updateCh := testutils.NewChannel()
ldsCancel := client.WatchListener(ldsName, func(u xdsresource.ListenerUpdate, err error) {
updateCh.Send(xdsresource.ListenerUpdateErrTuple{Update: u, Err: err})
})
defer ldsCancel()

// Configure the management server to return a single listener
// resource, corresponding to the one we registered a watch for.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify the contents of the received update.
wantUpdate := xdsresource.ListenerUpdateErrTuple{
Update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, updateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Wait for the watch expiry timer to fire, and verify that the callback is
// not invoked.
<-time.After(defaultTestWatchExpiryTimeout)
if err := verifyNoListenerUpdate(ctx, updateCh); err != nil {
t.Fatal(err)
}
}

// TestLDSWatch_ResourceRemoved covers the cases where a resource being watched
// is removed from the management server. The test verifies the following
// scenarios:
Expand Down Expand Up @@ -720,8 +835,8 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantNACKErr)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
}

Expand Down Expand Up @@ -760,7 +875,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
})
defer ldsCancel2()

// Configure the management with server two listener resources. One of these
// Configure the management server with two listener resources. One of these
// is a bad resource causing the update to be NACKed.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Expand All @@ -781,8 +896,8 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(xdsresource.ListenerUpdateErrTuple).Err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantNACKErr)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}

// Verify that the watcher watching the good resource receives a good
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/loadreport_test.go
Expand Up @@ -58,7 +58,7 @@ func (s) TestLRSClient(t *testing.T) {
TransportAPI: version.TransportV2,
NodeProto: &v2corepb.Node{},
},
}, defaultClientWatchExpiryTimeout)
}, defaultClientWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
Expand Down

0 comments on commit e81d0a2

Please sign in to comment.