diff --git a/internal/testutils/xds/e2e/logging.go b/internal/testutils/xds/e2e/logging.go new file mode 100644 index 00000000000..f524c451b00 --- /dev/null +++ b/internal/testutils/xds/e2e/logging.go @@ -0,0 +1,48 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" +) + +var logger = grpclog.Component("xds-e2e") + +// serverLogger implements the Logger interface defined at +// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache. +type serverLogger struct{} + +func (l serverLogger) Debugf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + logger.InfoDepth(1, msg) +} +func (l serverLogger) Infof(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + logger.InfoDepth(1, msg) +} +func (l serverLogger) Warnf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + logger.WarningDepth(1, msg) +} +func (l serverLogger) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + logger.ErrorDepth(1, msg) +} diff --git a/internal/testutils/xds/e2e/server.go b/internal/testutils/xds/e2e/server.go index e611c56c673..efe68be299b 100644 --- a/internal/testutils/xds/e2e/server.go +++ b/internal/testutils/xds/e2e/server.go @@ -26,43 +26,19 @@ import ( "reflect" "strconv" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" - v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "google.golang.org/grpc" ) -var logger = grpclog.Component("xds-e2e") - -// serverLogger implements the Logger interface defined at -// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache. -type serverLogger struct{} - -func (l serverLogger) Debugf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - logger.InfoDepth(1, msg) -} -func (l serverLogger) Infof(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - logger.InfoDepth(1, msg) -} -func (l serverLogger) Warnf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - logger.WarningDepth(1, msg) -} -func (l serverLogger) Errorf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - logger.ErrorDepth(1, msg) -} - // ManagementServer is a thin wrapper around the xDS control plane // implementation provided by envoyproxy/go-control-plane. type ManagementServer struct { @@ -77,27 +53,81 @@ type ManagementServer struct { version int // Version of resource snapshot. } +// ManagementServerOptions contains options to be passed to the management +// server during creation. +type ManagementServerOptions struct { + // Listener to accept connections on. If nil, a TPC listener on a local port + // will be created and used. + Listener net.Listener + + // The callbacks defined below correspond to the state of the world (sotw) + // version of the xDS API on the management server. + + // OnStreamOpen is called when an xDS stream is opened. The callback is + // invoked with the assigned stream ID and the type URL from the incoming + // request (or "" for ADS). + // + // Returning an error from this callback will end processing and close the + // stream. OnStreamClosed will still be called. + OnStreamOpen func(context.Context, int64, string) error + + // OnStreamClosed is called immediately prior to closing an xDS stream. The + // callback is invoked with the stream ID of the stream being closed. + OnStreamClosed func(int64) + + // OnStreamRequest is called when a request is received on the stream. The + // callback is invoked with the stream ID of the stream on which the request + // was received and the received request. + // + // Returning an error from this callback will end processing and close the + // stream. OnStreamClosed will still be called. + OnStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error + + // OnStreamResponse is called immediately prior to sending a response on the + // stream. The callback is invoked with the stream ID of the stream on which + // the response is being sent along with the incoming request and the outgoing + // response. + OnStreamResponse func(context.Context, int64, *v3discoverypb.DiscoveryRequest, *v3discoverypb.DiscoveryResponse) +} + // StartManagementServer initializes a management server which implements the // AggregatedDiscoveryService endpoint. The management server is initialized // with no resources. Tests should call the Update() method to change the // resource snapshot held by the management server, as required by the test // logic. When the test is done, it should call the Stop() method to cleanup // resources allocated by the management server. -func StartManagementServer() (*ManagementServer, error) { +func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) { // Create a snapshot cache. cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{}) logger.Infof("Created new snapshot cache...") - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, fmt.Errorf("failed to start xDS management server: %v", err) + var lis net.Listener + if opts != nil && opts.Listener != nil { + lis = opts.Listener + } else { + var err error + lis, err = net.Listen("tcp", "localhost:0") + if err != nil { + return nil, fmt.Errorf("failed to start xDS management server: %v", err) + } } - // Create an xDS management server and register the ADS implementation - // provided by it on a gRPC server. Cancelling the context passed to the - // server is the only way of stopping it at the end of the test. + // Cancelling the context passed to the server is the only way of stopping it + // at the end of the test. ctx, cancel := context.WithCancel(context.Background()) - xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{}) + callbacks := v3server.CallbackFuncs{} + if opts != nil { + callbacks = v3server.CallbackFuncs{ + StreamOpenFunc: opts.OnStreamOpen, + StreamClosedFunc: opts.OnStreamClosed, + StreamRequestFunc: opts.OnStreamRequest, + StreamResponseFunc: opts.OnStreamResponse, + } + } + + // Create an xDS management server and register the ADS implementation + // provided by it on a gRPC server. + xs := v3server.NewServer(ctx, cache, callbacks) gs := grpc.NewServer() v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs) logger.Infof("Registered Aggregated Discovery Service (ADS)...") diff --git a/internal/testutils/xds/e2e/setup_management_server.go b/internal/testutils/xds/e2e/setup_management_server.go index ca45363d6e0..c61f0620cb1 100644 --- a/internal/testutils/xds/e2e/setup_management_server.go +++ b/internal/testutils/xds/e2e/setup_management_server.go @@ -41,11 +41,11 @@ import ( // - bootstrap contents to be used by the client // - xDS resolver builder to be used by the client // - a cleanup function to be invoked at the end of the test -func SetupManagementServer(t *testing.T) (*ManagementServer, string, []byte, resolver.Builder, func()) { +func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*ManagementServer, string, []byte, resolver.Builder, func()) { t.Helper() // Spin up an xDS management server on a local port. - server, err := StartManagementServer() + server, err := StartManagementServer(opts) if err != nil { t.Fatalf("Failed to spin up the xDS management server: %v", err) } diff --git a/test/xds/xds_client_ack_nack_test.go b/test/xds/xds_client_ack_nack_test.go new file mode 100644 index 00000000000..ca0ec56e284 --- /dev/null +++ b/test/xds/xds_client_ack_nack_test.go @@ -0,0 +1,160 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// We are interested in LDS, RDS, CDS and EDS resources as part of the regular +// xDS flow on the client. +const wantResources = 4 + +// seenAllACKs returns true if we have seen two streams with acks for all the +// resources that we are interested in. +func seenAllACKs(acks map[int64]map[string]string) bool { + if len(acks) != 2 { + return false + } + for _, v := range acks { + if len(v) != wantResources { + return false + } + } + return true +} + +// TestClientResourceVersionAfterStreamRestart tests the scenario where the +// xdsClient's ADS stream to the management server gets broken. This test +// verifies that the version number on the initial request on the new stream +// indicates the most recent version seen by the client on the previous stream. +func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) { + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + streamClosed := grpcsync.NewEvent() // Event to notify stream closure. + acksReceived := grpcsync.NewEvent() // Event to notify receipt of acks for all resources. + // Map from stream id to a map of resource type to resource version. + ackVersionsMap := make(map[int64]map[string]string) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{ + Listener: lis, + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // Return early under the following circumstances: + // - Received all the requests we wanted to see. This is to avoid + // any stray requests leading to test flakes. + // - Request contains no resource names. Such requests are usually + // seen when the xdsclient is shutting down and is no longer + // interested in the resources that it had subscribed to earlier. + if acksReceived.HasFired() || len(req.GetResourceNames()) == 0 { + return nil + } + // Create a stream specific map to store ack versions if this is the + // first time we are seeing this stream id. + if ackVersionsMap[id] == nil { + ackVersionsMap[id] = make(map[string]string) + } + ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo() + if seenAllACKs(ackVersionsMap) { + acksReceived.Fire() + } + return nil + }, + OnStreamClosed: func(int64) { + streamClosed.Fire() + }, + }) + defer cleanup1() + + port, cleanup2 := startTestService(t, nil) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // A successful RPC means that we have captured the ack versions for all + // resources in the OnStreamRequest callback. Nothing more needs to be done + // here before stream restart. + + // Stop the listener on the management server. This will cause the client to + // backoff and recreate the stream. + lis.Stop() + + // Wait for the stream to be closed on the server. + <-streamClosed.Done() + + // Restart the listener on the management server to be able to accept + // reconnect attempts from the client. + lis.Restart() + + // Wait for all the previously sent resources to be re-requested. + <-acksReceived.Done() + + // We depend on the fact that the management server assigns monotonically + // increasing stream IDs starting at 1. + const ( + idBeforeRestart = 1 + idAfterRestart = 2 + ) + if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" { + t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff) + } + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} diff --git a/test/xds/xds_client_affinity_test.go b/test/xds/xds_client_affinity_test.go index 58b7fca03f5..94666af3b47 100644 --- a/test/xds/xds_client_affinity_test.go +++ b/test/xds/xds_client_affinity_test.go @@ -88,7 +88,7 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) { return func() { envconfig.XDSRingHash = old } }()() - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, nil) diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index 26f4c6b2996..29cf350337d 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -53,7 +53,7 @@ func (s) TestClientSideFederation(t *testing.T) { defer func() { envconfig.XDSFederation = oldXDSFederation }() // Start a management server as the default authority. - serverDefaultAuth, err := e2e.StartManagementServer() + serverDefaultAuth, err := e2e.StartManagementServer(nil) if err != nil { t.Fatalf("Failed to spin up the xDS management server: %v", err) } @@ -61,7 +61,7 @@ func (s) TestClientSideFederation(t *testing.T) { // Start another management server as the other authority. const nonDefaultAuth = "non-default-auth" - serverAnotherAuth, err := e2e.StartManagementServer() + serverAnotherAuth, err := e2e.StartManagementServer(nil) if err != nil { t.Fatalf("Failed to spin up the xDS management server: %v", err) } diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go index b2c3d2f8354..a0bafb987a7 100644 --- a/test/xds/xds_client_integration_test.go +++ b/test/xds/xds_client_integration_test.go @@ -74,7 +74,7 @@ func startTestService(t *testing.T, server *stubserver.StubServer) (uint32, func } func (s) TestClientSideXDS(t *testing.T) { - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, nil) diff --git a/test/xds/xds_client_retry_test.go b/test/xds/xds_client_retry_test.go index 646b66be67a..46eb8f34f3d 100644 --- a/test/xds/xds_client_retry_test.go +++ b/test/xds/xds_client_retry_test.go @@ -49,7 +49,7 @@ func (s) TestClientSideRetry(t *testing.T) { }, } - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, ss) diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go index 68d9fd99a7b..35b5fe37dc1 100644 --- a/test/xds/xds_rls_clusterspecifier_plugin_test.go +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -104,7 +104,7 @@ func (s) TestRLSinxDS(t *testing.T) { // Set up all components and configuration necessary - management server, // xDS resolver, fake RLS Server, and xDS configuration which specifies an // RLS Balancer that communicates to this set up fake RLS Server. - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, nil) defer cleanup2() diff --git a/test/xds/xds_security_config_nack_test.go b/test/xds/xds_security_config_nack_test.go index c5ec1196bbb..750fff03915 100644 --- a/test/xds/xds_security_config_nack_test.go +++ b/test/xds/xds_security_config_nack_test.go @@ -41,7 +41,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) { missingIdentityProviderInstance = "missing-identity-provider-instance" missingRootProviderInstance = "missing-root-provider-instance" ) - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -324,7 +324,7 @@ func (s) TestUnmarshalCluster_WithUpdateValidatorFunc(t *testing.T) { // SetupManagementServer() sets up a bootstrap file with certificate // provider instance names: `e2e.ServerSideCertProviderInstance` and // `e2e.ClientSideCertProviderInstance`. - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, nil) diff --git a/test/xds/xds_server_integration_test.go b/test/xds/xds_server_integration_test.go index cafea3064d2..3da983e1e89 100644 --- a/test/xds/xds_server_integration_test.go +++ b/test/xds/xds_server_integration_test.go @@ -125,7 +125,7 @@ func hostPortFromListener(lis net.Listener) (string, uint32, error) { // the client and the server. This results in both of them using the // configured fallback credentials (which is insecure creds in this case). func (s) TestServerSideXDS_Fallback(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -207,7 +207,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -277,7 +277,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { // configuration pointing to the use of the file_watcher plugin and we verify // that the same client is now able to successfully make an RPC. func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) diff --git a/test/xds/xds_server_rbac_test.go b/test/xds/xds_server_rbac_test.go index c48e2039c76..b1058bda967 100644 --- a/test/xds/xds_server_rbac_test.go +++ b/test/xds/xds_server_rbac_test.go @@ -60,7 +60,7 @@ func (s) TestServerSideXDS_RouteConfiguration(t *testing.T) { defer func() { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -605,7 +605,7 @@ func (s) TestRBACHTTPFilter(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { func() { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -790,7 +790,7 @@ func (s) TestRBACToggledOn_WithBadRouteConfiguration(t *testing.T) { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -847,7 +847,7 @@ func (s) TestRBACToggledOff_WithBadRouteConfiguration(t *testing.T) { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) diff --git a/test/xds/xds_server_serving_mode_test.go b/test/xds/xds_server_serving_mode_test.go index 118a63394f8..6f730948c12 100644 --- a/test/xds/xds_server_serving_mode_test.go +++ b/test/xds/xds_server_serving_mode_test.go @@ -43,7 +43,7 @@ import ( // change callback is not invoked and client connections to the server are not // recycled. func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) defer cleanup() creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) @@ -163,7 +163,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in // the server, and also verifies behavior of clientConns under these modes. func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) defer cleanup() // Configure xDS credentials to be used on the server-side. diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index d8dcdcdfbd0..c82eb601b66 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -236,7 +236,7 @@ func commonSetup(ctx context.Context, t *testing.T) (xdsclient.XDSClient, *e2e.M // Spin up a xDS management server on a local port. nodeID := uuid.New().String() - fs, err := e2e.StartManagementServer() + fs, err := e2e.StartManagementServer(nil) if err != nil { t.Fatal(err) } diff --git a/xds/internal/httpfilter/fault/fault_test.go b/xds/internal/httpfilter/fault/fault_test.go index e44f91a5558..904585fc4a6 100644 --- a/xds/internal/httpfilter/fault/fault_test.go +++ b/xds/internal/httpfilter/fault/fault_test.go @@ -100,7 +100,7 @@ func (*testService) FullDuplexCall(stream testpb.TestService_FullDuplexCallServe func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) { // Spin up a xDS management server on a local port. nodeID := uuid.New().String() - fs, err := e2e.StartManagementServer() + fs, err := e2e.StartManagementServer(nil) if err != nil { t.Fatal(err) } diff --git a/xds/internal/test/e2e/controlplane.go b/xds/internal/test/e2e/controlplane.go index 8f27ff053d7..0ad6fa201fb 100644 --- a/xds/internal/test/e2e/controlplane.go +++ b/xds/internal/test/e2e/controlplane.go @@ -33,7 +33,7 @@ type controlPlane struct { func newControlPlane() (*controlPlane, error) { // Spin up an xDS management server on a local port. - server, err := e2e.StartManagementServer() + server, err := e2e.StartManagementServer(nil) if err != nil { return nil, fmt.Errorf("failed to spin up the xDS management server: %v", err) } diff --git a/xds/internal/xdsclient/controller/controller.go b/xds/internal/xdsclient/controller/controller.go index d4829714547..4b07dc8d6ac 100644 --- a/xds/internal/xdsclient/controller/controller.go +++ b/xds/internal/xdsclient/controller/controller.go @@ -72,7 +72,7 @@ type Controller struct { watchMap map[xdsresource.ResourceType]map[string]bool // versionMap contains the version that was acked (the version in the ack // request that was sent on wire). The key is rType, the value is the - // version string, becaues the versions for different resource types should + // version string, because the versions for different resource types should // be independent. versionMap map[xdsresource.ResourceType]string // nonceMap contains the nonce from the most recent received response. diff --git a/xds/internal/xdsclient/controller/transport.go b/xds/internal/xdsclient/controller/transport.go index 9e983651272..34c5b024dd8 100644 --- a/xds/internal/xdsclient/controller/transport.go +++ b/xds/internal/xdsclient/controller/transport.go @@ -166,12 +166,20 @@ func (t *Controller) sendExisting(stream grpc.ClientStream) bool { t.mu.Lock() defer t.mu.Unlock() - // Reset the ack versions when the stream restarts. - t.versionMap = make(map[xdsresource.ResourceType]string) + // Reset only the nonce when the stream restarts. + // + // xDS spec says the following. See section: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-resource-type-instance-version + // + // Note that the version for a resource type is not a property of an + // individual xDS stream but rather a property of the resources themselves. If + // the stream becomes broken and the client creates a new stream, the client’s + // initial request on the new stream should indicate the most recent version + // seen by the client on the previous stream t.nonceMap = make(map[xdsresource.ResourceType]string) for rType, s := range t.watchMap { - if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { + if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, t.versionMap[rType], "", ""); err != nil { t.logger.Warningf("ADS request failed: %v", err) return false } @@ -296,8 +304,8 @@ func (t *Controller) processWatchInfo(w *watchAction) (target []string, rType xd rType = w.rType target = mapToSlice(current) // We don't reset version or nonce when a new watch is started. The version - // and nonce from previous response are carried by the request unless the - // stream is recreated. + // and nonce from previous response are carried by the request. Only the nonce + // is reset when the stream is recreated. ver = t.versionMap[rType] nonce = t.nonceMap[rType] return target, rType, ver, nonce