Skip to content

Commit

Permalink
xdsclient: switch xdsclient watch deadlock test to e2e style (#5697)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Nov 4, 2022
1 parent 32f969e commit 7f23df0
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 46 deletions.
15 changes: 13 additions & 2 deletions internal/testutils/xds/e2e/server.go
Expand Up @@ -60,6 +60,14 @@ type ManagementServerOptions struct {
// will be created and used.
Listener net.Listener

// AllowResourceSubSet allows the management server to respond to requests
// before all configured resources are explicitly named in the request. The
// default behavior that we want is for the management server to wait for
// all configured resources to be requested before responding to any of
// them, since this is how we have run our tests historically, and should be
// set to true only for tests which explicitly require the other behavior.
AllowResourceSubset bool

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

Expand Down Expand Up @@ -97,8 +105,11 @@ type ManagementServerOptions struct {
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
// Create a snapshot cache. The first parameter to NewSnapshotCache()
// controls whether the server should wait for all resources to be
// explicitly named in the request before responding to any of them.
wait := opts == nil || !opts.AllowResourceSubset
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

var lis net.Listener
Expand Down
44 changes: 0 additions & 44 deletions xds/internal/xdsclient/client_test.go
Expand Up @@ -31,7 +31,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -148,49 +147,6 @@ func (c *testController) Close() {
c.done.Fire()
}

// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start a watch for some resource, so that the controller and update
// handlers are built for this authority. The test needs these to make an
// inline watch in a callback.
client, ctrlCh := testClientSetup(t, false)
newWatch(t, client, xdsresource.ClusterResource, "doesnot-matter")
controller, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.ClusterResource, "doesnot-matter")

clusterUpdateCh := testutils.NewChannel()
firstTime := true
client.WatchCluster(testCDSName, func(update xdsresource.ClusterUpdate, err error) {
clusterUpdateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err})
// Calls another watch inline, to ensure there's deadlock.
client.WatchCluster("another-random-name", func(xdsresource.ClusterUpdate, error) {})

if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := xdsresource.ClusterUpdate{ClusterName: testEDSName}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
t.Fatal(err)
}

// The second update needs to be different in the underlying resource proto
// for the watch callback to be invoked.
wantUpdate2 := xdsresource.ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
t.Fatal(err)
}
}

func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
Expand Down
133 changes: 133 additions & 0 deletions xds/internal/xdsclient/e2e_test/misc_watchers_test.go
@@ -0,0 +1,133 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e_test

import (
"context"
"testing"

"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

// TestWatchCallAnotherWatch tests the scenario where a watch is registered for
// a resource, and more watches are registered from the first watch's callback.
// The test verifies that this scenario does not lead to a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
overrideFedEnvVar(t)

// Start an xDS management server and set the option to allow it to respond
// to requests which only specify a subset of the configured resources.
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

// Configure the management server to respond with route config resources.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{
e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsName),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Start a watch for one route configuration resource. From the watch
// callback of the first resource, register two more watches (one for the
// same resource name, which would be satisfied from the cache, and another
// for a different resource name, which would be satisfied from the server).
updateCh1 := testutils.NewChannel()
updateCh2 := testutils.NewChannel()
updateCh3 := testutils.NewChannel()
var rdsCancel2, rdsCancel3 func()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
// Watch for the same resource name.
rdsCancel2 = client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
t.Cleanup(rdsCancel2)
// Watch for a different resource name.
rdsCancel3 = client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
rdsCancel3()
})
t.Cleanup(rdsCancel3)
})
// defer rdsCancel1()
t.Cleanup(rdsCancel1)

// Verify the contents of the received update for the all watchers.
wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil {
t.Fatal(err)
}
}

0 comments on commit 7f23df0

Please sign in to comment.