Skip to content

Commit

Permalink
xds: move eds package to cluster_resolver (#4545)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 16, 2021
1 parent 549c53a commit 4c651ed
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 72 deletions.
12 changes: 6 additions & 6 deletions xds/internal/balancer/balancer.go
Expand Up @@ -20,10 +20,10 @@
package balancer

import (
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/priority" // Register the priority balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterresolver" // Register the xds_cluster_resolver balancer
_ "google.golang.org/grpc/xds/internal/balancer/priority" // Register the priority balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -34,7 +34,7 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/xdsclient"
)

Expand Down Expand Up @@ -314,7 +314,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
// is updated to cluster_resolver, which has the fallback functionality, we
// will fix this to handle all the clusters in list.
cds := update.chu[0]
lbCfg := &edsbalancer.EDSConfig{
lbCfg := &clusterresolver.EDSConfig{
ClusterName: cds.ClusterName,
EDSServiceName: cds.EDSServiceName,
MaxConcurrentRequests: cds.MaxRequests,
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -35,7 +35,7 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
Expand Down Expand Up @@ -197,7 +197,7 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
// edsCCS is a helper function to construct a good update passed from the
// cdsBalancer to the edsBalancer.
func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientConnState {
lbCfg := &edsbalancer.EDSConfig{
lbCfg := &clusterresolver.EDSConfig{
ClusterName: service,
MaxConcurrentRequests: countMax,
}
Expand Down
Expand Up @@ -16,8 +16,8 @@
*
*/

// Package edsbalancer contains EDS balancer implementation.
package edsbalancer
// Package clusterresolver contains EDS balancer implementation.
package clusterresolver

import (
"encoding/json"
Expand All @@ -38,7 +38,8 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
)

const edsName = "eds_experimental"
// Name is the name of the cluster_resolver balancer.
const Name = "eds_experimental"

var (
errBalancerClosed = errors.New("cdsBalancer is closed")
Expand Down Expand Up @@ -66,7 +67,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
return nil
}

b := &edsBalancer{
b := &clusterResolverBalancer{
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
Expand All @@ -88,7 +89,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
}

func (bb) Name() string {
return edsName
return Name
}

func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
Expand All @@ -113,11 +114,9 @@ type scUpdate struct {
state balancer.SubConnState
}

// edsBalancer manages xdsClient and the actual EDS balancer implementation that
// does load balancing.
//
// It currently has only an edsBalancer. Later, we may add fallback.
type edsBalancer struct {
// clusterResolverBalancer manages xdsClient and the actual EDS balancer
// implementation that does load balancing.
type clusterResolverBalancer struct {
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded // Channel for updates from gRPC.
Expand All @@ -134,23 +133,24 @@ type edsBalancer struct {
xdsClient xdsclient.XDSClient // xDS client to watch EDS resource.
attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies.

child balancer.Balancer
edsResp xdsclient.EndpointsUpdate
edsRespReceived bool
child balancer.Balancer
edsResp xdsclient.EndpointsUpdate
watchUpdateReceived bool
}

// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of an EDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// child balancer.
func (b *edsBalancer) handleClientConnUpdate(update *ccUpdate) {
func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
// update, only if the status quo has changed.
if err := update.err; err != nil {
b.handleErrorFromUpdate(err, true)
return
}

b.logger.Infof("Receive update from resolver, balancer config: %+v", update.state.BalancerConfig)
b.logger.Infof("Receive update from resolver, balancer config: %v", pretty.ToJSON(update.state.BalancerConfig))
cfg, _ := update.state.BalancerConfig.(*EDSConfig)
if cfg == nil {
b.logger.Warningf("xds: unexpected LoadBalancingConfig type: %T", update.state.BalancerConfig)
Expand All @@ -161,8 +161,8 @@ func (b *edsBalancer) handleClientConnUpdate(update *ccUpdate) {
b.configRaw = update.state.ResolverState.ServiceConfig
b.edsWatcher.updateConfig(cfg)

if !b.edsRespReceived {
// If eds resp was not received, wait for it.
if !b.watchUpdateReceived {
// If update was not received, wait for it.
return
}
// If eds resp was received before this, the child policy was created. We
Expand All @@ -175,15 +175,15 @@ func (b *edsBalancer) handleClientConnUpdate(update *ccUpdate) {

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying child balancer.
func (b *edsBalancer) handleWatchUpdate(update *watchUpdate) {
func (b *clusterResolverBalancer) handleWatchUpdate(update *watchUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}

b.logger.Infof("Watch update from xds-client %p, content: %+v", b.xdsClient, pretty.ToJSON(update.eds))
b.edsRespReceived = true
b.logger.Infof("resource update: %+v", pretty.ToJSON(update.eds))
b.watchUpdateReceived = true
b.edsResp = update.eds

// A new EDS update triggers new child configs (e.g. different priorities
Expand All @@ -199,7 +199,7 @@ func (b *edsBalancer) handleWatchUpdate(update *watchUpdate) {
// generates the addresses, because the endpoints come from the EDS resp.
//
// If child balancer doesn't already exist, one will be created.
func (b *edsBalancer) updateChildConfig() error {
func (b *clusterResolverBalancer) updateChildConfig() error {
// Child was build when the first EDS resp was received, so we just build
// the config and addresses.
if b.child == nil {
Expand Down Expand Up @@ -237,7 +237,7 @@ func (b *edsBalancer) updateChildConfig() error {
// - If it's from xds client, it means EDS resource were removed. The EDS
// watcher should keep watching.
// In both cases, the sub-balancers will be receive the error.
func (b *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bool) {
b.logger.Warningf("Received error: %v", err)
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
// This is an error from the parent ClientConn (can be the parent CDS
Expand All @@ -260,7 +260,7 @@ func (b *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// run is a long-running goroutine which handles all updates from gRPC and
// xdsClient. All methods which are invoked directly by gRPC or xdsClient simply
// push an update onto a channel which is read and acted upon right here.
func (b *edsBalancer) run() {
func (b *clusterResolverBalancer) run() {
for {
select {
case u := <-b.updateCh.Get():
Expand Down Expand Up @@ -302,9 +302,9 @@ func (b *edsBalancer) run() {
// UpdateClientConnState receives the serviceConfig (which contains the
// clusterName to watch for in CDS) and the xdsClient object from the
// xdsResolver.
func (b *edsBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
func (b *clusterResolverBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after edsBalancer was closed", state)
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterResolverBalancer was closed", state)
return errBalancerClosed
}

Expand All @@ -322,25 +322,25 @@ func (b *edsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
}

// ResolverError handles errors reported by the xdsResolver.
func (b *edsBalancer) ResolverError(err error) {
func (b *clusterResolverBalancer) ResolverError(err error) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%v} after edsBalancer was closed", err)
b.logger.Warningf("xds: received resolver error {%v} after clusterResolverBalancer was closed", err)
return
}
b.updateCh.Put(&ccUpdate{err: err})
}

// UpdateSubConnState handles subConn updates from gRPC.
func (b *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (b *clusterResolverBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subConn update {%v, %v} after edsBalancer was closed", sc, state)
b.logger.Warningf("xds: received subConn update {%v, %v} after clusterResolverBalancer was closed", sc, state)
return
}
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
}

// Close closes the cdsBalancer and the underlying child balancer.
func (b *edsBalancer) Close() {
func (b *clusterResolverBalancer) Close() {
b.closed.Fire()
<-b.done.Done()
}
Expand Up @@ -18,7 +18,7 @@
*
*/

package edsbalancer
package clusterresolver

import (
"bytes"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me")
func (*fakeSubConn) Connect() { panic("implement me") }

// waitForNewChildLB makes sure that a new child LB is created by the top-level
// edsBalancer.
// clusterResolverBalancer.
func waitForNewChildLB(ctx context.Context, ch *testutils.Channel) (*fakeChildBalancer, error) {
val, err := ch.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -205,17 +205,17 @@ func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) {
}
}

// TestSubConnStateChange verifies if the top-level edsBalancer passes on
// TestSubConnStateChange verifies if the top-level clusterResolverBalancer passes on
// the subConnState to appropriate child balancer.
func (s) TestSubConnStateChange(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

Expand Down Expand Up @@ -258,10 +258,10 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

Expand Down Expand Up @@ -303,7 +303,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
Expand Down Expand Up @@ -346,10 +346,10 @@ func (s) TestErrorFromResolver(t *testing.T) {
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

Expand Down Expand Up @@ -392,7 +392,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
t.Fatalf("want resolver error, got %v", err)
}

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
edsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
Expand Down Expand Up @@ -448,10 +448,10 @@ func (s) TestClientWatchEDS(t *testing.T) {
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package edsbalancer
package clusterresolver

import (
"encoding/json"
Expand Down
Expand Up @@ -16,7 +16,7 @@
*
*/

package edsbalancer
package clusterresolver

import (
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
Expand Down
Expand Up @@ -16,7 +16,7 @@
*
*/

package edsbalancer
package clusterresolver

import (
"fmt"
Expand Down

0 comments on commit 4c651ed

Please sign in to comment.