Skip to content

Commit

Permalink
xds/cds: add env var for aggregated and DNS cluster (#4440)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed May 14, 2021
1 parent 50c071e commit a12250e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
20 changes: 15 additions & 5 deletions internal/xds/env/env.go
Expand Up @@ -37,11 +37,13 @@ const (
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"

c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
Expand All @@ -60,6 +62,7 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)

// CircuitBreakingSupport indicates whether circuit breaking support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
Expand All @@ -71,17 +74,24 @@ var (
// FaultInjectionSupport is used to control both fault injection and HTTP
// filter support.
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
// cluster and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")

// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)
6 changes: 6 additions & 0 deletions xds/internal/client/cds_test.go
Expand Up @@ -124,6 +124,9 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
},
}

oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
env.AggregateAndDNSSupportEnv = true
defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil {
Expand Down Expand Up @@ -261,6 +264,9 @@ func (s) TestValidateCluster_Success(t *testing.T) {
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
env.AggregateAndDNSSupportEnv = true
defer func() { env.CircuitBreakingSupport = oldAggregateAndDNSSupportEnv }()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := validateClusterAndConstructClusterUpdate(test.cluster)
Expand Down
8 changes: 7 additions & 1 deletion xds/internal/client/xds.go
Expand Up @@ -615,9 +615,15 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName()
return ret, nil
case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS:
if !env.AggregateAndDNSSupportEnv {
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
ret.ClusterType = ClusterTypeLogicalDNS
return ret, nil
case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate":
if !env.AggregateAndDNSSupportEnv {
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
clusters := &v3aggregateclusterpb.ClusterConfig{}
if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil {
return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
Expand All @@ -626,7 +632,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
ret.PrioritizedClusterNames = clusters.Clusters
return ret, nil
default:
return ClusterUpdate{}, fmt.Errorf("unexpected cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
}

Expand Down

0 comments on commit a12250e

Please sign in to comment.