Skip to content

Commit

Permalink
Implementation of the xds_experimental resolver.
Browse files Browse the repository at this point in the history
This resolver doesn't do much at this point, except returning an empty
address list and a hard-coded service config which picks the xds
balancer with a round_robin child policy.

Also moved the xdsConfig struct to the xds/internal package and exported
it as LBConfig, so that both the resolver and the balancer packages can
make use of this.
  • Loading branch information
easwars committed Aug 15, 2019
1 parent 36ddecc commit 3958fc4
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 139 deletions.
1 change: 1 addition & 0 deletions vet.sh
Expand Up @@ -111,6 +111,7 @@ google.golang.org/grpc/balancer.go:SA1019
google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019
google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019
google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
Expand Down
3 changes: 3 additions & 0 deletions xds/experimental/xds_experimental.go
Expand Up @@ -24,9 +24,12 @@ package experimental

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
)

func init() {
resolver.Register(xdsresolver.NewBuilder())
balancer.Register(xdsbalancer.NewBalancerBuilder())
}
100 changes: 12 additions & 88 deletions xds/internal/balancer/xds.go
Expand Up @@ -33,16 +33,14 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/balancer/lrs"
cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds"
edspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/eds"
)

const (
defaultTimeout = 10 * time.Second
xdsName = "xds_experimental"
)
const defaultTimeout = 10 * time.Second

var (
// This field is for testing purpose.
Expand Down Expand Up @@ -85,11 +83,11 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
}

func (b *xdsBalancerBuilder) Name() string {
return xdsName
return xdsinternal.ExperimentalName
}

func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg xdsConfig
var cfg xdsinternal.LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c))
}
Expand Down Expand Up @@ -130,15 +128,15 @@ type xdsBalancer struct {
timer *time.Timer
noSubConnAlert <-chan struct{}

client *client // may change when passed a different service config
config *xdsConfig // may change when passed a different service config
client *client // may change when passed a different service config
config *xdsinternal.LBConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *resolver.State // may change when HandleResolved address is called
loadStore lrs.Store
}

func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
func (x *xdsBalancer) startNewXDSClient(u *xdsinternal.LBConfig) {
// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
// xdsClient to get a response from the traffic director.
if x.startup {
Expand Down Expand Up @@ -237,7 +235,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
}
}
case *balancer.ClientConnState:
cfg, _ := u.BalancerConfig.(*xdsConfig)
cfg, _ := u.BalancerConfig.(*xdsinternal.LBConfig)
if cfg == nil {
// service config parsing failed. should never happen.
return
Expand Down Expand Up @@ -497,16 +495,16 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
}
}

func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) {
func (x *xdsBalancer) buildFallBackBalancer(c *xdsinternal.LBConfig) {
if c.FallBackPolicy == nil {
x.buildFallBackBalancer(&xdsConfig{
FallBackPolicy: &loadBalancingConfig{
x.buildFallBackBalancer(&xdsinternal.LBConfig{
FallBackPolicy: &xdsinternal.LoadBalancingConfig{
Name: "round_robin",
},
})
return
}
// builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified
// builder will always be non-nil, since when parse JSON into xdsinternal.LBConfig, we check whether the specified
// balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name)

Expand Down Expand Up @@ -566,77 +564,3 @@ func createDrainedTimer() *time.Timer {
}
return timer
}

type xdsConfig struct {
serviceconfig.LoadBalancingConfig
BalancerName string
ChildPolicy *loadBalancingConfig
FallBackPolicy *loadBalancingConfig
}

// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists
// and select the first LB policy which has been registered to be stored in the returned xdsConfig.
func (p *xdsConfig) UnmarshalJSON(data []byte) error {
var val map[string]json.RawMessage
if err := json.Unmarshal(data, &val); err != nil {
return err
}
for k, v := range val {
switch k {
case "balancerName":
if err := json.Unmarshal(v, &p.BalancerName); err != nil {
return err
}
case "childPolicy":
var lbcfgs []*loadBalancingConfig
if err := json.Unmarshal(v, &lbcfgs); err != nil {
return err
}
for _, lbcfg := range lbcfgs {
if balancer.Get(lbcfg.Name) != nil {
p.ChildPolicy = lbcfg
break
}
}
case "fallbackPolicy":
var lbcfgs []*loadBalancingConfig
if err := json.Unmarshal(v, &lbcfgs); err != nil {
return err
}
for _, lbcfg := range lbcfgs {
if balancer.Get(lbcfg.Name) != nil {
p.FallBackPolicy = lbcfg
break
}
}
}
}
return nil
}

func (p *xdsConfig) MarshalJSON() ([]byte, error) {
return nil, nil
}

type loadBalancingConfig struct {
Name string
Config json.RawMessage
}

func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
m := make(map[string]json.RawMessage)
m[l.Name] = l.Config
return json.Marshal(m)
}

func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
var cfg map[string]json.RawMessage
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}
for name, config := range cfg {
l.Name = name
l.Config = config
}
return nil
}
7 changes: 4 additions & 3 deletions xds/internal/balancer/xds_lrs_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
lrspb "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s) TestXdsLoadReporting(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()

builder := balancer.Get(xdsName)
builder := balancer.Get(xdsinternal.ExperimentalName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
if !ok {
Expand All @@ -112,9 +113,9 @@ func (s) TestXdsLoadReporting(t *testing.T) {
Nanos: intervalNano,
}

cfg := &xdsConfig{
cfg := &xdsinternal.LBConfig{
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds.
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds.
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
Expand Down

0 comments on commit 3958fc4

Please sign in to comment.