Skip to content

Commit

Permalink
[swap_eds_to_split_8] clean and done
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed May 18, 2021
1 parent 3e5e70c commit 7f0581a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 115 deletions.
Expand Up @@ -36,9 +36,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
xdsclient "google.golang.org/grpc/xds/internal/client"

_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)

const (
Expand Down
3 changes: 0 additions & 3 deletions xds/internal/balancer/edsbalancer/configbuilder_test.go
Expand Up @@ -27,9 +27,6 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdsclient "google.golang.org/grpc/xds/internal/client"

_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)

const (
Expand Down
156 changes: 47 additions & 109 deletions xds/internal/balancer/edsbalancer/eds.go
Expand Up @@ -23,12 +23,10 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand All @@ -37,7 +35,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)

const edsName = "eds_experimental"
Expand Down Expand Up @@ -70,9 +67,11 @@ func (edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
}

b := &edsBalancer{
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),

priorityBuilder: priorityBuilder,
priorityConfigParser: priorityConfigParser,
Expand All @@ -92,10 +91,6 @@ func (edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
updateChannel: make(chan *watchUpdate, 1),
}

b.ccw = &ccWrapper{
ClientConn: cc,
parent: b,
}
go b.run()
return b
}
Expand All @@ -116,7 +111,6 @@ func (edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBala
// balancer. It's defined so we can override xdsclient.New function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}

Expand All @@ -134,27 +128,19 @@ type scUpdate struct {
state balancer.SubConnState
}

// watchUpdate wraps the information received from a registered EDS watcher. A
// non-nil error is propagated to the underlying child balancer. A valid update
// results in creating a new child balancer (priority balancer, if one doesn't
// already exist) and pushing the updated balancer config to it.
type watchUpdate struct {
eds xdsclient.EndpointsUpdate
err error
}

// edsBalancer manages xdsClient and the actual EDS balancer implementation that
// does load balancing.
//
// It currently has only an edsBalancer. Later, we may add fallback.
type edsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
cc balancer.ClientConn // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClientInterface // xDS client to watch EDS resource.
edsWatcher *edsWatcher // EDS watcher to watch EDS resource.
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event

priorityBuilder balancer.Builder
priorityConfigParser balancer.ConfigParser
Expand All @@ -165,9 +151,6 @@ type edsBalancer struct {
child balancer.Balancer
edsResp xdsclient.EndpointsUpdate
edsRespReceived bool

clusterNameMu sync.Mutex
clusterName string
}

// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
Expand All @@ -189,51 +172,61 @@ func (eb *edsBalancer) handleClientConnUpdate(update *ccUpdate) {
return
}

if err := eb.handleServiceConfigUpdate(cfg, update.state.ResolverState.ServiceConfig); err != nil {
eb.logger.Warningf("failed to update xDS client: %v", err)
eb.config = cfg
eb.configRaw = update.state.ResolverState.ServiceConfig
eb.edsWatcher.updateConfig(cfg)

if !eb.edsRespReceived {
// If eds resp was not received, wait for it.
return
}
// If eds resp was received before this, the child policy was created. We
// need to generate a new balancer config and send it to the child, because
// certain fields (unrelated to EDS watch) might have changed.
if err := eb.updateChildConfig(); err != nil {
eb.logger.Warningf("failed to update child policy config: %v", err)
}
}

// handleServiceConfigUpdate applies the service config update, watching a new
// EDS service name and restarting LRS stream, as required.
func (eb *edsBalancer) handleServiceConfigUpdate(config *EDSConfig, raw *serviceconfig.ParseResult) error {
// Update the cached cluster name, which will be attached to the address
// attributes when creating SubConns. Needs to hold the mutex because
// NewSubConn and UpdateSubConn are not synchronized.
eb.clusterNameMu.Lock()
eb.clusterName = config.ClusterName
eb.clusterNameMu.Unlock()

eb.config = config
eb.configRaw = raw
// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying child balancer.
func (eb *edsBalancer) handleWatchUpdate(update *watchUpdate) {
if err := update.err; err != nil {
eb.logger.Warningf("Watch error from xds-client %p: %v", eb.xdsClient, err)
eb.handleErrorFromUpdate(err, false)
return
}

eb.edsWatcher.updateConfig(config)
eb.logger.Infof("Watch update from xds-client %p, content: %+v", eb.xdsClient, pretty.ToJSON(update.eds))
eb.edsRespReceived = true
eb.edsResp = update.eds

if !eb.edsRespReceived {
// If eds resp was not received, wait for it.
return nil
// A new EDS update triggers new child configs (e.g. different priorities
// for the priority balancer), and new addresses (the endpoints come from
// the EDS response).
if err := eb.updateChildConfig(); err != nil {
eb.logger.Warningf("failed to update child policy's balancer config: %v", err)
}
// If eds resp was received, we need to generate a new balancer config and
// send it to the child, because certain fields (unrelated to EDS watch)
// might have changed.
return eb.updateChildConfig()
}

// updateChildConfig builds a balancer config from eb's cached eds resp and
// service config, and sends that to the child balancer.
// service config, and sends that to the child balancer. Note that it also
// generates the addresses, because the endpoints come from the EDS resp.
//
// If child balancer doesn't already exist, one will be created.
func (eb *edsBalancer) updateChildConfig() error {
// Child was build when the first EDS resp was received, so we just build
// the config and addresses.
if eb.child == nil {
eb.child = newChildBalancer(eb.priorityBuilder, eb.ccw, eb.bOpts)
eb.child = newChildBalancer(eb.priorityBuilder, eb.cc, eb.bOpts)
}

childCfgBytes, addrs := buildPriorityConfigMarshalled(eb.edsResp, eb.config)
childCfg, err := eb.priorityConfigParser.ParseConfig(childCfgBytes)
if err != nil {
eb.logger.Warningf("failed to parse generated priority balancer config, this should never happen because the config is generated: %v", err)
err = fmt.Errorf("failed to parse generated priority balancer config, this should never happen because the config is generated: %v", err)
eb.logger.Warningf("%v", err)
return err
}
eb.logger.Infof("build balancer config: %v", pretty.ToJSON(childCfg))
return eb.child.UpdateClientConnState(balancer.ClientConnState{
Expand All @@ -245,24 +238,6 @@ func (eb *edsBalancer) updateChildConfig() error {
})
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying child balancer.
func (eb *edsBalancer) handleWatchUpdate(update *watchUpdate) {
if err := update.err; err != nil {
eb.logger.Warningf("Watch error from xds-client %p: %v", eb.xdsClient, err)
eb.handleErrorFromUpdate(err, false)
return
}

eb.logger.Infof("Watch update from xds-client %p, content: %+v", eb.xdsClient, pretty.ToJSON(update.eds))
eb.edsRespReceived = true
eb.edsResp = update.eds

if err := eb.updateChildConfig(); err != nil {
eb.logger.Warningf("failed to update child policy's balancer config: %v", err)
}
}

// handleErrorFromUpdate handles both the error from parent ClientConn (from CDS
// balancer) and the error from xds client (from the watcher). fromParent is
// true if error is from parent ClientConn.
Expand All @@ -286,9 +261,8 @@ func (eb *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
if eb.child != nil {
eb.child.ResolverError(err)
} else {
// If eds balancer was never created, fail the RPCs with
// errors.
eb.ccw.UpdateState(balancer.State{
// If eds balancer was never created, fail the RPCs with errors.
eb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
Expand All @@ -308,8 +282,8 @@ func (eb *edsBalancer) run() {
case *ccUpdate:
eb.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are passthrough and are simply handed over to
// the underlying child balancer.
// SubConn updates are simply handed over to the underlying
// child balancer.
if eb.child == nil {
eb.logger.Errorf("xds: received scUpdate {%+v} with no child balancer", update)
break
Expand All @@ -331,11 +305,14 @@ func (eb *edsBalancer) run() {
eb.xdsClient.Close()
// This is the *ONLY* point of return from this function.
eb.logger.Infof("Shutdown")
eb.done.Fire()
return
}
}
}

// Following are methods to implement the balancer interface.

// UpdateClientConnState receives the serviceConfig (which contains the
// clusterName to watch for in CDS) and the xdsClient object from the
// xdsResolver.
Expand Down Expand Up @@ -370,44 +347,5 @@ func (eb *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
// Close closes the cdsBalancer and the underlying child balancer.
func (eb *edsBalancer) Close() {
eb.closed.Fire()
}

// --------------------------------------

func (eb *edsBalancer) getClusterName() string {
eb.clusterNameMu.Lock()
defer eb.clusterNameMu.Unlock()
return eb.config.ClusterName
}

// ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at
// creation and intercepts the NewSubConn() and UpdateAddresses() call from the
// child policy to add security configuration required by xDS credentials.
//
// Other methods of the balancer.ClientConn interface are not overridden and
// hence get the original implementation.
type ccWrapper struct {
balancer.ClientConn
parent *edsBalancer
}

// NewSubConn intercepts NewSubConn() calls from the child policy and adds an
// address attribute which provides all information required by the xdsCreds
// handshaker to perform the TLS handshake.
func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := ccw.parent.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
return ccw.ClientConn.NewSubConn(newAddrs, opts)
}

func (ccw *ccWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
clusterName := ccw.parent.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
ccw.ClientConn.UpdateAddresses(sc, newAddrs)
<-eb.done.Done()
}
9 changes: 9 additions & 0 deletions xds/internal/balancer/edsbalancer/eds_watcher.go
Expand Up @@ -25,6 +25,15 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
)

// watchUpdate wraps the information received from a registered EDS watcher. A
// non-nil error is propagated to the underlying child balancer. A valid update
// results in creating a new child balancer (priority balancer, if one doesn't
// already exist) and pushing the updated balancer config to it.
type watchUpdate struct {
eds xdsclient.EndpointsUpdate
err error
}

// edsWatcher takes an EDS balancer config, and use the xds_client to watch EDS
// updates. The EDS updates are passed back to the balancer via a channel.
type edsWatcher struct {
Expand Down

0 comments on commit 7f0581a

Please sign in to comment.