diff --git a/internal/testutils/xds/e2e/server.go b/internal/testutils/xds/e2e/server.go index efe68be299b5..38eb01adea8d 100644 --- a/internal/testutils/xds/e2e/server.go +++ b/internal/testutils/xds/e2e/server.go @@ -60,6 +60,14 @@ type ManagementServerOptions struct { // will be created and used. Listener net.Listener + // AllowResourceSubSet allows the management server to respond to requests + // before all configured resources are explicitly named in the request. The + // default behavior that we want is for the management server to wait for + // all configured resources to be requested before responding to any of + // them, since this is how we have run our tests historically, and should be + // set to true only for tests which explicitly require the other behavior. + AllowResourceSubset bool + // The callbacks defined below correspond to the state of the world (sotw) // version of the xDS API on the management server. @@ -97,8 +105,11 @@ type ManagementServerOptions struct { // logic. When the test is done, it should call the Stop() method to cleanup // resources allocated by the management server. func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) { - // Create a snapshot cache. - cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{}) + // Create a snapshot cache. The first parameter to NewSnapshotCache() + // controls whether the server should wait for all resources to be + // explicitly named in the request before responding to any of them. + wait := opts == nil || !opts.AllowResourceSubset + cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{}) logger.Infof("Created new snapshot cache...") var lis net.Listener diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index d496aa59adc3..9501a154c018 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -31,7 +31,6 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/load" "google.golang.org/grpc/xds/internal/xdsclient/pubsub" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" - "google.golang.org/protobuf/types/known/anypb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -148,49 +147,6 @@ func (c *testController) Close() { c.done.Fire() } -// TestWatchCallAnotherWatch covers the case where watch() is called inline by a -// callback. It makes sure it doesn't cause a deadlock. -func (s) TestWatchCallAnotherWatch(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Start a watch for some resource, so that the controller and update - // handlers are built for this authority. The test needs these to make an - // inline watch in a callback. - client, ctrlCh := testClientSetup(t, false) - newWatch(t, client, xdsresource.ClusterResource, "doesnot-matter") - controller, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.ClusterResource, "doesnot-matter") - - clusterUpdateCh := testutils.NewChannel() - firstTime := true - client.WatchCluster(testCDSName, func(update xdsresource.ClusterUpdate, err error) { - clusterUpdateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err}) - // Calls another watch inline, to ensure there's deadlock. - client.WatchCluster("another-random-name", func(xdsresource.ClusterUpdate, error) {}) - - if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); firstTime && err != nil { - t.Fatalf("want new watch to start, got error %v", err) - } - firstTime = false - }) - if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); err != nil { - t.Fatalf("want new watch to start, got error %v", err) - } - - wantUpdate := xdsresource.ClusterUpdate{ClusterName: testEDSName} - updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, xdsresource.UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil { - t.Fatal(err) - } - - // The second update needs to be different in the underlying resource proto - // for the watch callback to be invoked. - wantUpdate2 := xdsresource.ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}} - updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, xdsresource.UpdateMetadata{}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil { - t.Fatal(err) - } -} - func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdate, wantErr error) error { u, err := updateCh.Receive(ctx) if err != nil { diff --git a/xds/internal/xdsclient/e2e_test/misc_watchers_test.go b/xds/internal/xdsclient/e2e_test/misc_watchers_test.go new file mode 100644 index 000000000000..a22970ccdab3 --- /dev/null +++ b/xds/internal/xdsclient/e2e_test/misc_watchers_test.go @@ -0,0 +1,133 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e_test + +import ( + "context" + "testing" + + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" +) + +// TestWatchCallAnotherWatch tests the scenario where a watch is registered for +// a resource, and more watches are registered from the first watch's callback. +// The test verifies that this scenario does not lead to a deadlock. +func (s) TestWatchCallAnotherWatch(t *testing.T) { + overrideFedEnvVar(t) + + // Start an xDS management server and set the option to allow it to respond + // to requests which only specify a subset of the configured resources. + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Configure the management server to respond with route config resources. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{ + e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), + e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsName), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Start a watch for one route configuration resource. From the watch + // callback of the first resource, register two more watches (one for the + // same resource name, which would be satisfied from the cache, and another + // for a different resource name, which would be satisfied from the server). + updateCh1 := testutils.NewChannel() + updateCh2 := testutils.NewChannel() + updateCh3 := testutils.NewChannel() + var rdsCancel2, rdsCancel3 func() + rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + // Watch for the same resource name. + rdsCancel2 = client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + t.Cleanup(rdsCancel2) + // Watch for a different resource name. + rdsCancel3 = client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + rdsCancel3() + }) + t.Cleanup(rdsCancel3) + }) + // defer rdsCancel1() + t.Cleanup(rdsCancel1) + + // Verify the contents of the received update for the all watchers. + wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsNameNewStyle}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil { + t.Fatal(err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil { + t.Fatal(err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil { + t.Fatal(err) + } +}