From 6779cf403688a6a05159a624366513422d4bc196 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 4 Nov 2022 10:59:28 -0700 Subject: [PATCH] xdsclient: improve authority watchers test (#5700) --- xds/internal/xdsclient/authority_test.go | 360 ------------------ .../xdsclient/e2e_test/authority_test.go | 315 +++++++++++++++ xds/internal/xdsclient/watchers_test.go | 7 + 3 files changed, 322 insertions(+), 360 deletions(-) delete mode 100644 xds/internal/xdsclient/authority_test.go create mode 100644 xds/internal/xdsclient/e2e_test/authority_test.go diff --git a/xds/internal/xdsclient/authority_test.go b/xds/internal/xdsclient/authority_test.go deleted file mode 100644 index b8704a9c0c7a..000000000000 --- a/xds/internal/xdsclient/authority_test.go +++ /dev/null @@ -1,360 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xdsclient - -import ( - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/envconfig" - "google.golang.org/grpc/internal/testutils" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" - "google.golang.org/protobuf/testing/protocmp" -) - -var ( - serverConfigs = []*bootstrap.ServerConfig{ - { - ServerURI: testXDSServer + "0", - Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), - CredsType: "creds-0", - TransportAPI: version.TransportV2, - NodeProto: xdstestutils.EmptyNodeProtoV2, - }, - { - ServerURI: testXDSServer + "1", - Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), - CredsType: "creds-1", - TransportAPI: version.TransportV3, - NodeProto: xdstestutils.EmptyNodeProtoV3, - }, - { - ServerURI: testXDSServer + "2", - Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), - CredsType: "creds-2", - TransportAPI: version.TransportV2, - NodeProto: xdstestutils.EmptyNodeProtoV2, - }, - } - - serverConfigCmpOptions = cmp.Options{ - cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds"), - protocmp.Transform(), - } -) - -func overrideFedEnvVar(t *testing.T) { - oldFed := envconfig.XDSFederation - envconfig.XDSFederation = true - t.Cleanup(func() { envconfig.XDSFederation = oldFed }) -} - -// watchAndFetchNewController starts a CDS watch on the client for the given -// resourceName, and tries to receive a new controller from the ctrlCh. -// -// It returns false if there's no controller in the ctrlCh. -func watchAndFetchNewController(t *testing.T, client *clientImpl, resourceName string, ctrlCh *testutils.Channel) (*testController, bool, func()) { - updateCh := testutils.NewChannel() - cancelWatch := client.WatchCluster(resourceName, func(update xdsresource.ClusterUpdate, err error) { - updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err}) - }) - - // Clear the item in the watch channel, otherwise the next watch will block. - authority := xdsresource.ParseName(resourceName).Authority - var config *bootstrap.ServerConfig - if authority == "" { - config = client.config.XDSServer - } else { - authConfig, ok := client.config.Authorities[authority] - if !ok { - t.Fatalf("failed to find authority %q", authority) - } - config = authConfig.XDSServer - } - a := client.authorities[config.String()] - if a == nil { - t.Fatalf("authority for %q is not created", authority) - } - ctrlTemp := a.controller.(*testController) - // Clear the channel so the next watch on this controller can proceed. - ctrlTemp.addWatches[xdsresource.ClusterResource].ReceiveOrFail() - - cancelWatchRet := func() { - cancelWatch() - ctrlTemp.removeWatches[xdsresource.ClusterResource].ReceiveOrFail() - } - - // Try to receive a new controller. - c, ok := ctrlCh.ReceiveOrFail() - if !ok { - return nil, false, cancelWatchRet - } - ctrl := c.(*testController) - return ctrl, true, cancelWatchRet -} - -// TestAuthorityDefaultAuthority covers that a watch for an old style resource -// name (one without authority) builds a controller using the top level server -// config. -func (s) TestAuthorityDefaultAuthority(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{testAuthority: {XDSServer: serverConfigs[1]}}, - }, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - ctrl, ok, _ := watchAndFetchNewController(t, client, testCDSName, ctrlCh) - if !ok { - t.Fatalf("want a new controller to be built, got none") - } - // Want the default server config. - wantConfig := serverConfigs[0] - if diff := cmp.Diff(ctrl.config, wantConfig, serverConfigCmpOptions); diff != "" { - t.Fatalf("controller is built with unexpected config, diff (-got +want): %v", diff) - } -} - -// TestAuthorityNoneDefaultAuthority covers that a watch with a new style -// resource name creates a controller with the corresponding server config. -func (s) TestAuthorityNoneDefaultAuthority(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{testAuthority: {XDSServer: serverConfigs[1]}}, - }, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil) - ctrl, ok, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh) - if !ok { - t.Fatalf("want a new controller to be built, got none") - } - // Want the server config for this authority. - wantConfig := serverConfigs[1] - if diff := cmp.Diff(ctrl.config, wantConfig, serverConfigCmpOptions); diff != "" { - t.Fatalf("controller is built with unexpected config, diff (-got +want): %v", diff) - } -} - -// TestAuthorityShare covers that -// - watch with the same authority name doesn't create new authority -// - watch with different authority name but same authority config doesn't -// create new authority -func (s) TestAuthorityShare(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{ - testAuthority: {XDSServer: serverConfigs[1]}, - testAuthority2: {XDSServer: serverConfigs[1]}, // Another authority name, but with the same config. - }, - }, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil) - ctrl1, ok1, _ := watchAndFetchNewController(t, client, resourceName, ctrlCh) - if !ok1 { - t.Fatalf("want a new controller to be built, got none") - } - // Want the server config for this authority. - wantConfig := serverConfigs[1] - if diff := cmp.Diff(ctrl1.config, wantConfig, serverConfigCmpOptions); diff != "" { - t.Fatalf("controller is built with unexpected config, diff (-got +want): %v", diff) - } - - // Call the watch with the same authority name. This shouldn't create a new - // controller. - resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil) - ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh) - if ok2 { - t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config) - } - - // Call the watch with a different authority name, but the same server - // config. This shouldn't create a new controller. - resourceNameSameConfig := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority2, testCDSName+"1", nil) - if ctrl, ok, _ := watchAndFetchNewController(t, client, resourceNameSameConfig, ctrlCh); ok { - t.Fatalf("an unexpected controller is built with config: %v", ctrl.config) - } -} - -// TestAuthorityIdle covers that -// - authorities are put in a timeout cache when the last watch is canceled -// - idle authorities are not immediately closed. They will be closed after a -// timeout. -func (s) TestAuthorityIdleTimeout(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - const idleTimeout = 50 * time.Millisecond - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{ - testAuthority: {XDSServer: serverConfigs[1]}, - }, - }, defaultWatchExpiryTimeout, idleTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil) - ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh) - if !ok1 { - t.Fatalf("want a new controller to be built, got none") - } - - var cancelWatch2 func() - // Call the watch with the same authority name. This shouldn't create a new - // controller. - resourceNameSameAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil) - ctrl2, ok2, cancelWatch2 := watchAndFetchNewController(t, client, resourceNameSameAuthority, ctrlCh) - if ok2 { - t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config) - } - - cancelWatch1() - if ctrl1.done.HasFired() { - t.Fatalf("controller is closed immediately when the watch is canceled, wanted to be put in the idle cache") - } - - // Cancel the second watch, should put controller in the idle cache. - cancelWatch2() - if ctrl1.done.HasFired() { - t.Fatalf("controller is closed when the second watch is closed") - } - - time.Sleep(idleTimeout * 2) - if !ctrl1.done.HasFired() { - t.Fatalf("controller is not closed after idle timeout") - } -} - -// TestAuthorityClientClose covers that the authorities in use and in idle cache -// are all closed when the client is closed. -func (s) TestAuthorityClientClose(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{ - testAuthority: {XDSServer: serverConfigs[1]}, - }, - }, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - resourceName := testCDSName - ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh) - if !ok1 { - t.Fatalf("want a new controller to be built, got none") - } - - resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil) - ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh) - if !ok2 { - t.Fatalf("want a new controller to be built, got none") - } - - cancelWatch1() - if ctrl1.done.HasFired() { - t.Fatalf("controller is closed immediately when the watch is canceled, wanted to be put in the idle cache") - } - - // Close the client while watch2 is not canceled. ctrl1 is in the idle - // cache, ctrl2 is in use. Both should be closed. - client.Close() - - if !ctrl1.done.HasFired() { - t.Fatalf("controller in idle cache is not closed after client is closed") - } - if !ctrl2.done.HasFired() { - t.Fatalf("controller in use is not closed after client is closed") - } -} - -// TestAuthorityRevive covers that the authorities in the idle cache is revived -// when a new watch is started on this authority. -func (s) TestAuthorityRevive(t *testing.T) { - overrideFedEnvVar(t) - ctrlCh := overrideNewController(t) - - const idleTimeout = 50 * time.Millisecond - - client, err := newWithConfig(&bootstrap.Config{ - XDSServer: serverConfigs[0], - Authorities: map[string]*bootstrap.Authority{ - testAuthority: {XDSServer: serverConfigs[1]}, - }, - }, defaultWatchExpiryTimeout, idleTimeout) - if err != nil { - t.Fatalf("failed to create client: %v", err) - } - t.Cleanup(client.Close) - - // Start a watch on the authority, and cancel it. This puts the authority in - // the idle cache. - resourceName := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName, nil) - ctrl1, ok1, cancelWatch1 := watchAndFetchNewController(t, client, resourceName, ctrlCh) - if !ok1 { - t.Fatalf("want a new controller to be built, got none") - } - cancelWatch1() - - // Start another watch on this authority, it should retrieve the authority - // from the cache, instead of creating a new one. - resourceNameWithAuthority := xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority, testCDSName+"1", nil) - ctrl2, ok2, _ := watchAndFetchNewController(t, client, resourceNameWithAuthority, ctrlCh) - if ok2 { - t.Fatalf("an unexpected controller is built with config: %v", ctrl2.config) - } - - // Wait for double the idle timeout, the controller shouldn't be closed, - // since it was revived. - time.Sleep(idleTimeout * 2) - if ctrl1.done.HasFired() { - t.Fatalf("controller that was revived is closed after timeout, want not closed") - } -} diff --git a/xds/internal/xdsclient/e2e_test/authority_test.go b/xds/internal/xdsclient/e2e_test/authority_test.go new file mode 100644 index 000000000000..93b63636dc68 --- /dev/null +++ b/xds/internal/xdsclient/e2e_test/authority_test.go @@ -0,0 +1,315 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + xdstestutils "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" +) + +const ( + testAuthority1 = "test-authority1" + testAuthority2 = "test-authority2" + testAuthority3 = "test-authority3" +) + +var ( + // These two resources use `testAuthority1`, which contains an empty server + // config in the bootstrap file, and therefore will use the default + // management server. + authorityTestResourceName11 = xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority1, cdsName+"1", nil) + authorityTestResourceName12 = xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority1, cdsName+"2", nil) + // This resource uses `testAuthority2`, which contains an empty server + // config in the bootstrap file, and therefore will use the default + // management server. + authorityTestResourceName2 = xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority2, cdsName+"3", nil) + // This resource uses `testAuthority3`, which contains a non-empty server + // config in the bootstrap file, and therefore will use the non-default + // management server. + authorityTestResourceName3 = xdstestutils.BuildResourceName(xdsresource.ClusterResource, testAuthority3, cdsName+"3", nil) +) + +// setupForAuthorityTests spins up two management servers, one to act as the +// default and the other to act as the non-default. It also generates a +// bootstrap configuration with three authorities (the first two pointing to the +// default and the third one pointing to the non-default). +// +// Returns two listeners used by the default and non-default management servers +// respectively, and the xDS client. +func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.Duration) (*testutils.ListenerWrapper, *testutils.ListenerWrapper, xdsclient.XDSClient) { + overrideFedEnvVar(t) + + // Create listener wrappers which notify on to a channel whenever a new + // connection is accepted. We use this to track the number of transports + // used by the xDS client. + lisDefault := testutils.NewListenerWrapper(t, nil) + lisNonDefault := testutils.NewListenerWrapper(t, nil) + + // Start a management server to act as the default authority. + defaultAuthorityServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{Listener: lisDefault}) + if err != nil { + t.Fatalf("Failed to spin up the xDS management server: %v", err) + } + t.Cleanup(func() { defaultAuthorityServer.Stop() }) + + // Start a management server to act as the non-default authority. + nonDefaultAuthorityServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{Listener: lisNonDefault}) + if err != nil { + t.Fatalf("Failed to spin up the xDS management server: %v", err) + } + t.Cleanup(func() { nonDefaultAuthorityServer.Stop() }) + + // Create a bootstrap configuration with two non-default authorities which + // have empty server configs, and therefore end up using the default server + // config, which points to the above management server. + nodeID := uuid.New().String() + client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ + XDSServer: &bootstrap.ServerConfig{ + ServerURI: defaultAuthorityServer.Address, + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + TransportAPI: version.TransportV3, + NodeProto: &v3corepb.Node{Id: nodeID}, + }, + Authorities: map[string]*bootstrap.Authority{ + testAuthority1: {}, + testAuthority2: {}, + testAuthority3: { + XDSServer: &bootstrap.ServerConfig{ + ServerURI: nonDefaultAuthorityServer.Address, + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + TransportAPI: version.TransportV3, + NodeProto: &v3corepb.Node{Id: nodeID}, + }, + }, + }, + }, defaultTestWatchExpiryTimeout, idleTimeout) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + t.Cleanup(func() { client.Close() }) + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster(authorityTestResourceName11, edsName, e2e.SecurityLevelNone), + e2e.DefaultCluster(authorityTestResourceName12, edsName, e2e.SecurityLevelNone), + e2e.DefaultCluster(authorityTestResourceName2, edsName, e2e.SecurityLevelNone), + e2e.DefaultCluster(authorityTestResourceName3, edsName, e2e.SecurityLevelNone), + }, + SkipValidation: true, + } + if err := defaultAuthorityServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + return lisDefault, lisNonDefault, client +} + +// TestAuthorityShare tests the authority sharing logic. The test verifies the +// following scenarios: +// - A watch for a resource name with an authority matching an existing watch +// should not result in a new transport being created. +// - A watch for a resource name with different authority name but same +// authority config as an existing watch should not result in a new transport +// being created. +func (s) TestAuthorityShare(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + lis, _, client := setupForAuthorityTests(ctx, t, time.Duration(0)) + + // Verify that no connection is established to the management server at this + // point. A transport is created only when a resource (which belongs to that + // authority) is requested. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Unexpected new transport created to management server") + } + + // Request the first resource. Verify that a new transport is created. + cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + defer cdsCancel1() + if _, err := lis.NewConnCh.Receive(ctx); err != nil { + t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) + } + + // Request the second resource. Verify that no new transport is created. + cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + defer cdsCancel2() + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Unexpected new transport created to management server") + } + + // Request the third resource. Verify that no new transport is created. + cdsCancel3 := client.WatchCluster(authorityTestResourceName2, func(u xdsresource.ClusterUpdate, err error) {}) + defer cdsCancel3() + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Unexpected new transport created to management server") + } +} + +// TestAuthorityIdle test the authority idle timeout logic. The test verifies +// that the xDS client does not close authorities immediately after the last +// watch is canceled, but waits for the configured idle timeout to expire before +// closing them. +func (s) TestAuthorityIdleTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + lis, _, client := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout) + + // Request the first resource. Verify that a new transport is created. + cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + val, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) + } + conn := val.(*testutils.ConnWrapper) + + // Request the second resource. Verify that no new transport is created. + cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Unexpected new transport created to management server") + } + + // Cancel both watches, and verify that the connection to the management + // server is not closed immediately. + cdsCancel1() + cdsCancel2() + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Connection to management server closed unexpectedly") + } + + // Wait for the authority idle timeout to fire. + time.Sleep(2 * defaultTestIdleAuthorityTimeout) + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := conn.CloseCh.Receive(sCtx); err != nil { + t.Fatal("Connection to management server not closed after idle timeout expiry") + } +} + +// TestAuthorityClientClose verifies that authorities in use and in the idle +// cache are all closed when the client is closed. +func (s) TestAuthorityClientClose(t *testing.T) { + // Set the authority idle timeout to twice the defaultTestTimeout. This will + // ensure that idle authorities stay in the cache for the duration of this + // test, until explicitly closed. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + lisDefault, lisNonDefault, client := setupForAuthorityTests(ctx, t, time.Duration(2*defaultTestTimeout)) + + // Request the first resource. Verify that a new transport is created to the + // default management server. + cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + val, err := lisDefault.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) + } + connDefault := val.(*testutils.ConnWrapper) + + // Request another resource which is served by the non-default authority. + // Verify that a new transport is created to the non-default management + // server. + client.WatchCluster(authorityTestResourceName3, func(u xdsresource.ClusterUpdate, err error) {}) + val, err = lisNonDefault.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) + } + connNonDefault := val.(*testutils.ConnWrapper) + + // Cancel the first watch. This should move the default authority to the + // idle cache, but the connection should not be closed yet, because the idle + // timeout would not have fired. + cdsCancel1() + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := connDefault.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Connection to management server closed unexpectedly") + } + + // Closing the xDS client should close the connection to both management + // servers, even though we have an open watch to one of them. + client.Close() + if _, err := connDefault.CloseCh.Receive(ctx); err != nil { + t.Fatal("Connection to management server not closed after client close") + } + if _, err := connNonDefault.CloseCh.Receive(ctx); err != nil { + t.Fatal("Connection to management server not closed after client close") + } +} + +// TestAuthorityRevive verifies that an authority in the idle cache is revived +// when a new watch is started on this authority. +func (s) TestAuthorityRevive(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + lis, _, client := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout) + + // Request the first resource. Verify that a new transport is created. + cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + val, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) + } + conn := val.(*testutils.ConnWrapper) + + // Cancel the above watch. This should move the authority to the idle cache. + cdsCancel1() + + // Request the second resource. Verify that no new transport is created. + // This should move the authority out of the idle cache. + cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + defer cdsCancel2() + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Unexpected new transport created to management server") + } + + // Wait for double the idle timeout, and the connection to the management + // server should not be closed, since it was revived from the idle cache. + time.Sleep(2 * defaultTestIdleAuthorityTimeout) + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Connection to management server closed unexpectedly") + } +} diff --git a/xds/internal/xdsclient/watchers_test.go b/xds/internal/xdsclient/watchers_test.go index 5e2cdb123eff..87789d11ebc9 100644 --- a/xds/internal/xdsclient/watchers_test.go +++ b/xds/internal/xdsclient/watchers_test.go @@ -22,6 +22,7 @@ import ( "fmt" "testing" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/testutils" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" @@ -30,6 +31,12 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +func overrideFedEnvVar(t *testing.T) { + oldFed := envconfig.XDSFederation + envconfig.XDSFederation = true + t.Cleanup(func() { envconfig.XDSFederation = oldFed }) +} + // testClientSetup sets up the client and controller for the test. It returns a // newly created client, and a channel where new controllers will be sent to. func testClientSetup(t *testing.T, overrideWatchExpiryTimeout bool) (*clientImpl, *testutils.Channel) {