Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: support cluster fallback in cluster_resolver #4594

Merged
merged 2 commits into from Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 70 additions & 62 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -35,27 +35,27 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
cdsName = "cds_experimental"
edsName = "eds_experimental"
)

var (
errBalancerClosed = errors.New("cdsBalancer is closed")

// newEDSBalancer is a helper function to build a new edsBalancer and will be
// overridden in unittests.
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(edsName)
// newChildBalancer is a helper function to build a new cluster_resolver
// balancer and will be overridden in unittests.
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(clusterresolver.Name)
if builder == nil {
return nil, fmt.Errorf("xds: no balancer builder with name %v", edsName)
return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name)
}
// We directly pass the parent clientConn to the
// underlying edsBalancer because the cdsBalancer does
// not deal with subConns.
// We directly pass the parent clientConn to the underlying
// cluster_resolver balancer because the cdsBalancer does not deal with
// subConns.
return builder.Build(cc, opts), nil
}
buildProvider = buildProviderFunc
Expand Down Expand Up @@ -126,31 +126,32 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
// existing watch and propagate the error to the underlying edsBalancer.
// existing watch and propagate the error to the underlying cluster_resolver
// balancer.
type ccUpdate struct {
clusterName string
err error
}

// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the edsBalancer.
// on to the cluster_resolver balancer.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}

// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based
// LB policy to further resolve the serviceName received from CDS, into
// localities and endpoints. Implements the balancer.Balancer interface which
// is exposed to gRPC and implements the balancer.ClientConn interface which is
// exposed to the edsBalancer.
// cdsBalancer implements a CDS based LB policy. It instantiates a
// cluster_resolver balancer to further resolve the serviceName received from
// CDS, into localities and endpoints. Implements the balancer.Balancer
// interface which is exposed to gRPC and implements the balancer.ClientConn
// interface which is exposed to the cluster_resolver balancer.
type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource.
clusterHandler *clusterHandler // To watch the clusters.
edsLB balancer.Balancer // EDS child policy.
childLB balancer.Balancer
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
Expand All @@ -166,7 +167,7 @@ type cdsBalancer struct {
// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of a CDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// edsBalancer.
// cluster_resolver balancer.
func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
// update, only if the status quo has changed.
Expand Down Expand Up @@ -266,15 +267,15 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying edsBalancer.
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}

b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.chu), pretty.ToJSON(update.securityCfg))
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))

// Process the security config from the received update before building the
// child policy or forwarding the update to it. We do this because the child
Expand All @@ -291,47 +292,54 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
}

// The first good update from the watch API leads to the instantiation of an
// edsBalancer. Further updates/errors are propagated to the existing
// edsBalancer.
if b.edsLB == nil {
edsLB, err := newEDSBalancer(b.ccw, b.bOpts)
// cluster_resolver balancer. Further updates/errors are propagated to the existing
// cluster_resolver balancer.
if b.childLB == nil {
childLB, err := newChildBalancer(b.ccw, b.bOpts)
if err != nil {
b.logger.Errorf("Failed to create child policy of type %s, %v", edsName, err)
b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err)
return
}
b.edsLB = edsLB
b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName)
}
b.childLB = childLB
b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
}

dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case xdsclient.ClusterTypeEDS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: cu.ClusterName,
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)

if len(update.chu) == 0 {
b.logger.Infof("got update with 0 cluster updates, should never happen. There should be at least one cluster")
}
case xdsclient.ClusterTypeLogicalDNS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: cu.DNSHostName,
}
default:
b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
}
// TODO: this function is currently only handling the cluster with higher
// priority. This should work in most cases (e.g. if the cluster is not a
// aggregated cluster, or if the higher priority cluster works fine so
// there's no need to fallback). This quick fix is to unblock the testing
// work before the full fallback support is complete. Once the EDS balancer
// is updated to cluster_resolver, which has the fallback functionality, we
// will fix this to handle all the clusters in list.
cds := update.chu[0]
lbCfg := &clusterresolver.EDSConfig{
ClusterName: cds.ClusterName,
EDSServiceName: cds.EDSServiceName,
MaxConcurrentRequests: cds.MaxRequests,
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
}
if cds.EnableLRS {
// An empty string here indicates that the edsBalancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
lbCfg.LrsLoadReportingServerName = new(string)

}
ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
}
}

Expand All @@ -348,20 +356,20 @@ func (b *cdsBalancer) run() {
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are passthrough and are simply handed over to
// the underlying edsBalancer.
if b.edsLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update)
// the underlying cluster_resolver balancer.
if b.childLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no cluster_resolver balancer", update)
break
}
b.edsLB.UpdateSubConnState(update.subConn, update.state)
b.childLB.UpdateSubConnState(update.subConn, update.state)
}
case u := <-b.clusterHandler.updateChannel:
b.handleWatchUpdate(u)
case <-b.closed.Done():
b.clusterHandler.close()
if b.edsLB != nil {
b.edsLB.Close()
b.edsLB = nil
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
Expand Down Expand Up @@ -389,22 +397,22 @@ func (b *cdsBalancer) run() {
// - If it's from xds client, it means CDS resource were removed. The CDS
// watcher should keep watching.
//
// In both cases, the error will be forwarded to EDS balancer. And if error is
// resource-not-found, the child EDS balancer will stop watching EDS.
// In both cases, the error will be forwarded to the child balancer. And if
// error is resource-not-found, the child balancer will stop watching EDS.
func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// This is not necessary today, because xds client never sends connection
// errors.
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
b.clusterHandler.close()
}
if b.edsLB != nil {
if b.childLB != nil {
if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.edsLB.ResolverError(err)
b.childLB.ResolverError(err)
}
} else {
// If eds balancer was never created, fail the RPCs with
// If child balancer was never created, fail the RPCs with
// errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand Down
20 changes: 10 additions & 10 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go
Expand Up @@ -153,8 +153,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
// Override the creation of the EDS balancer to return a fake EDS balancer
// implementation.
edsB := newTestEDSBalancer()
oldEDSBalancerBuilder := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
oldEDSBalancerBuilder := newChildBalancer
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
edsB.parentCC = cc
return edsB, nil
}
Expand All @@ -177,7 +177,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup. No security config is
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -551,7 +551,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -601,7 +601,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -672,7 +672,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
Expand Down