Skip to content

Commit

Permalink
xds: Add Outlier Detection configuration and CDS handling (#5299)
Browse files Browse the repository at this point in the history
xds: Add Outlier Detection configuration and CDS handling
  • Loading branch information
zasweq committed May 9, 2022
1 parent 5c46f1a commit 462d867
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 21 deletions.
51 changes: 51 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -29,13 +29,15 @@ import (
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/internal/buffer"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
Expand Down Expand Up @@ -270,6 +272,52 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
return provider, nil
}

func outlierDetectionToConfig(od *xdsresource.OutlierDetection) *outlierdetection.LBConfig { // Already validated - no need to return error
if od == nil {
// "If the outlier_detection field is not set in the Cluster message, a
// "no-op" outlier_detection config will be generated, with interval set
// to the maximum possible value and all other fields unset." - A50
return &outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
}

// "if the enforcing_success_rate field is set to 0, the config
// success_rate_ejection field will be null and all success_rate_* fields
// will be ignored." - A50
var sre *outlierdetection.SuccessRateEjection
if od.EnforcingSuccessRate != 0 {
sre = &outlierdetection.SuccessRateEjection{
StdevFactor: od.SuccessRateStdevFactor,
EnforcementPercentage: od.EnforcingSuccessRate,
MinimumHosts: od.SuccessRateMinimumHosts,
RequestVolume: od.SuccessRateRequestVolume,
}
}

// "If the enforcing_failure_percent field is set to 0 or null, the config
// failure_percent_ejection field will be null and all failure_percent_*
// fields will be ignored." - A50
var fpe *outlierdetection.FailurePercentageEjection
if od.EnforcingFailurePercentage != 0 {
fpe = &outlierdetection.FailurePercentageEjection{
Threshold: od.FailurePercentageThreshold,
EnforcementPercentage: od.EnforcingFailurePercentage,
MinimumHosts: od.FailurePercentageMinimumHosts,
RequestVolume: od.FailurePercentageRequestVolume,
}
}

return &outlierdetection.LBConfig{
Interval: od.Interval,
BaseEjectionTime: od.BaseEjectionTime,
MaxEjectionTime: od.MaxEjectionTime,
MaxEjectionPercent: od.MaxEjectionPercent,
SuccessRateEjection: sre,
FailurePercentageEjection: fpe,
}
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
Expand Down Expand Up @@ -342,6 +390,9 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
default:
b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
if envconfig.XDSOutlierDetection {
dms[i].OutlierDetection = outlierDetectionToConfig(cu.OutlierDetection)
}
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go
Expand Up @@ -250,7 +250,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -462,7 +462,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -549,7 +549,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -599,7 +599,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
Expand Down Expand Up @@ -677,7 +677,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
SubjectAltNameMatchers: testSANMatchers,
},
}
wantCCS := edsCCS(serviceName, nil, false, nil)
wantCCS := edsCCS(serviceName, nil, false, nil, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
Expand Down

0 comments on commit 462d867

Please sign in to comment.