Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: switch xdsclient watch deadlock test to e2e style #5697

Merged
merged 4 commits into from Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions internal/testutils/xds/e2e/server.go
Expand Up @@ -60,6 +60,14 @@ type ManagementServerOptions struct {
// will be created and used.
Listener net.Listener

// AllowResourceSubSet allows the management server to respond to requests
// before all configured resources are explicitly named in the request. The
// default behavior that we want is for the management server to wait for
// all configured resources to be requested before responding to any of
// them, since this is how we have run our tests historically, and should be
// set to true only for tests which explicitly require the other behavior.
AllowResourceSubset bool

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

Expand Down Expand Up @@ -97,8 +105,11 @@ type ManagementServerOptions struct {
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
// Create a snapshot cache. The first parameter to NewSnapshotCache()
// controls whether the server should wait for all resources to be
// explicitly named in the request before responding to any of them.
wait := opts == nil || !opts.AllowResourceSubset
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

var lis net.Listener
Expand Down
44 changes: 0 additions & 44 deletions xds/internal/xdsclient/client_test.go
Expand Up @@ -31,7 +31,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -148,49 +147,6 @@ func (c *testController) Close() {
c.done.Fire()
}

// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start a watch for some resource, so that the controller and update
// handlers are built for this authority. The test needs these to make an
// inline watch in a callback.
client, ctrlCh := testClientSetup(t, false)
newWatch(t, client, xdsresource.ClusterResource, "doesnot-matter")
controller, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.ClusterResource, "doesnot-matter")

clusterUpdateCh := testutils.NewChannel()
firstTime := true
client.WatchCluster(testCDSName, func(update xdsresource.ClusterUpdate, err error) {
clusterUpdateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err})
// Calls another watch inline, to ensure there's deadlock.
client.WatchCluster("another-random-name", func(xdsresource.ClusterUpdate, error) {})

if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := xdsresource.ClusterUpdate{ClusterName: testEDSName}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
t.Fatal(err)
}

// The second update needs to be different in the underlying resource proto
// for the watch callback to be invoked.
wantUpdate2 := xdsresource.ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
t.Fatal(err)
}
}

func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
Expand Down
133 changes: 133 additions & 0 deletions xds/internal/xdsclient/e2e_test/misc_watchers_test.go
@@ -0,0 +1,133 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e_test

import (
"context"
"testing"

"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

// TestWatchCallAnotherWatch tests the scenario where a watch is registered for
// a resource, and more watches are registered from the first watch's callback.
// The test verifies that this scenario does not lead to a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
overrideFedEnvVar(t)

// Start an xDS management server and set the option to allow it to respond
// to requests which only specify a subset of the configured resources.
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

// Configure the management server to respond with route config resources.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{
e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsName),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Start a watch for one route configuration resource. From the watch
// callback of the first resource, register two more watches (one for the
// same resource name, which would be satisfied from the cache, and another
// for a different resource name, which would be satisfied from the server).
updateCh1 := testutils.NewChannel()
updateCh2 := testutils.NewChannel()
updateCh3 := testutils.NewChannel()
var rdsCancel2, rdsCancel3 func()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
// Watch for the same resource name.
rdsCancel2 = client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
t.Cleanup(rdsCancel2)
// Watch for a different resource name.
rdsCancel3 = client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
rdsCancel3()
})
t.Cleanup(rdsCancel3)
})
// defer rdsCancel1()
t.Cleanup(rdsCancel1)

// Verify the contents of the received update for the all watchers.
wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil {
t.Fatal(err)
}
}