diff --git a/internal/xds/env/env.go b/internal/xds/env/env.go index 1110722a630..db9ac93b968 100644 --- a/internal/xds/env/env.go +++ b/internal/xds/env/env.go @@ -37,11 +37,13 @@ const ( // and kept in variable BootstrapFileName. // // When both bootstrap FileName and FileContent are set, FileName is used. - BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG" + BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG" + circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION" clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT" + aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI" @@ -60,6 +62,7 @@ var ( // // When both bootstrap FileName and FileContent are set, FileName is used. BootstrapFileContent = os.Getenv(BootstrapFileContentEnv) + // CircuitBreakingSupport indicates whether circuit breaking support is // enabled, which can be disabled by setting the environment variable // "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false". @@ -71,10 +74,6 @@ var ( // FaultInjectionSupport is used to control both fault injection and HTTP // filter support. FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false") - // C2PResolverSupport indicates whether support for C2P resolver is enabled. - // This can be enabled by setting the environment variable - // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true". - C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true") // ClientSideSecuritySupport is used to control processing of security // configuration on the client-side. // @@ -82,6 +81,17 @@ var ( // have a brand new API on the server-side and users explicitly need to use // the new API to get security integration on the server. ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true") + // AggregateAndDNSSupportEnv indicates whether processing of aggregated + // cluster and DNS cluster is enabled, which can be enabled by setting the + // environment variable + // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to + // "true". + AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true") + + // C2PResolverSupport indicates whether support for C2P resolver is enabled. + // This can be enabled by setting the environment variable + // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true". + C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true") // C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing. C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv) ) diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go index 7d8cf6a8670..5209105b430 100644 --- a/xds/internal/client/cds_test.go +++ b/xds/internal/client/cds_test.go @@ -124,6 +124,9 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, } + oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv + env.AggregateAndDNSSupportEnv = true + defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }() for _, test := range tests { t.Run(test.name, func(t *testing.T) { if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil { @@ -261,6 +264,9 @@ func (s) TestValidateCluster_Success(t *testing.T) { origCircuitBreakingSupport := env.CircuitBreakingSupport env.CircuitBreakingSupport = true defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() + oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv + env.AggregateAndDNSSupportEnv = true + defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }() for _, test := range tests { t.Run(test.name, func(t *testing.T) { update, err := validateClusterAndConstructClusterUpdate(test.cluster) diff --git a/xds/internal/client/xds.go b/xds/internal/client/xds.go index e9fe205a7ce..7b4f4048ea5 100644 --- a/xds/internal/client/xds.go +++ b/xds/internal/client/xds.go @@ -615,9 +615,15 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName() return ret, nil case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS: + if !env.AggregateAndDNSSupportEnv { + return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) + } ret.ClusterType = ClusterTypeLogicalDNS return ret, nil case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate": + if !env.AggregateAndDNSSupportEnv { + return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) + } clusters := &v3aggregateclusterpb.ClusterConfig{} if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil { return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err) @@ -626,7 +632,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu ret.PrioritizedClusterNames = clusters.Clusters return ret, nil default: - return ClusterUpdate{}, fmt.Errorf("unexpected cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) + return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) } }