Skip to content

Commit

Permalink
[cluster_resolver_new_3] c1
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 20, 2021
1 parent a0e146a commit effbf91
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 94 deletions.
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -275,7 +275,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
return
}

b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.chu), pretty.ToJSON(update.securityCfg))
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))

// Process the security config from the received update before building the
// child policy or forwarding the update to it. We do this because the child
Expand Down Expand Up @@ -304,8 +304,8 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
}

dms := make([]balancerconfig.DiscoveryMechanism, len(update.chu))
for i, cu := range update.chu {
dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case xdsclient.ClusterTypeEDS:
dms[i] = balancerconfig.DiscoveryMechanism{
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -32,9 +32,9 @@ var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a
type clusterHandlerUpdate struct {
// securityCfg is the Security Config from the top (root) cluster.
securityCfg *xdsclient.SecurityConfig
// chu is a list of ClusterUpdates from all the leaf clusters.
chu []xdsclient.ClusterUpdate
err error
// updates is a list of ClusterUpdates from all the leaf clusters.
updates []xdsclient.ClusterUpdate
err error
}

// clusterHandler will be given a name representing a cluster. It will then
Expand Down Expand Up @@ -101,7 +101,7 @@ func (ch *clusterHandler) constructClusterUpdate() {
}
ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.root.clusterUpdate.SecurityCfg,
chu: clusterUpdate,
updates: clusterUpdate,
}
}

Expand Down
10 changes: 5 additions & 5 deletions xds/internal/balancer/cdsbalancer/cluster_handler_test.go
Expand Up @@ -95,7 +95,7 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
Expand Down Expand Up @@ -189,7 +189,7 @@ func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) {
fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" {
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) {
// ordered as per the cluster update.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService,
}, {
Expand Down Expand Up @@ -412,7 +412,7 @@ func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) {

select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService,
}, {
Expand Down Expand Up @@ -658,7 +658,7 @@ func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) {
// Then an update should successfully be written to the update buffer.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService2,
}}); diff != "" {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Expand Up @@ -186,9 +186,9 @@ func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) {
return
}

b.logger.Infof("resource update: %+v", pretty.ToJSON(update.p))
b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities))
b.watchUpdateReceived = true
b.priorities = update.p
b.priorities = update.priorities

// A new EDS update triggers new child configs (e.g. different priorities
// for the priority balancer), and new addresses (the endpoints come from
Expand Down
90 changes: 26 additions & 64 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Expand Up @@ -215,14 +215,8 @@ func (s) TestSubConnStateChange(t *testing.T) {
defer edsB.Close()

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
Expand Down Expand Up @@ -269,14 +263,8 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -329,14 +317,8 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {

// An update with the same service name should not trigger a new watch.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -369,14 +351,8 @@ func (s) TestErrorFromResolver(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -426,14 +402,8 @@ func (s) TestErrorFromResolver(t *testing.T) {
// An update with the same service name should trigger a new watch, because
// the previous watch was canceled.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -484,14 +454,8 @@ func (s) TestClientWatchEDS(t *testing.T) {
defer cancel()
// If eds service name is not set, should watch for cluster name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: "cluster-1",
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS("cluster-1"),
}); err != nil {
t.Fatal(err)
}
Expand All @@ -502,14 +466,8 @@ func (s) TestClientWatchEDS(t *testing.T) {
// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: "foobar-1",
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS("foobar-1"),
}); err != nil {
t.Fatal(err)
}
Expand All @@ -522,18 +480,22 @@ func (s) TestClientWatchEDS(t *testing.T) {
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: "foobar-2",
}},
},
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS("foobar-2"),
}); err != nil {
t.Fatal(err)
}
if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil {
t.Fatal(err)
}
}

func newLBConfigWithOneEDS(edsServiceName string) *LBConfig {
return &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: edsServiceName,
}},
}
}
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/config.go
Expand Up @@ -49,7 +49,7 @@ type LBConfig struct {
// locality-policy's config. Optional; defaults to "round_robin".
EndpointPickingPolicy *internalserviceconfig.BalancerConfig `json:"endpointPickingPolicy,omitempty"`

// FIXME: read and warn if endpoint is not roundrobin or locality is not
// TODO: read and warn if endpoint is not roundrobin or locality is not
// weightedtarget.
}

Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Expand Up @@ -28,8 +28,8 @@ import (
// resourceUpdate is a combined update from all the resources, in the order of
// priority. For example, it can be {EDS, EDS, DNS}.
type resourceUpdate struct {
p []balancerconfig.PriorityConfig
err error
priorities []balancerconfig.PriorityConfig
err error
}

type discoveryMechanism interface {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (rr *resourceResolver) generate() {
case <-rr.updateChannel:
default:
}
rr.updateChannel <- &resourceUpdate{p: ret}
rr.updateChannel <- &resourceUpdate{priorities: ret}
}

type edsDiscoveryMechanism struct {
Expand Down
24 changes: 12 additions & 12 deletions xds/internal/balancer/clusterresolver/resource_resolver_test.go
Expand Up @@ -110,7 +110,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, test.want); diff != "" {
if diff := cmp.Diff(u.priorities, test.want); diff != "" {
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
Expand Down Expand Up @@ -192,7 +192,7 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: test.addrs})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, test.want); diff != "" {
if diff := cmp.Diff(u.priorities, test.want); diff != "" {
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
Expand Down Expand Up @@ -240,7 +240,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
Expand Down Expand Up @@ -287,7 +287,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
Expand All @@ -314,7 +314,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
}
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
Expand Down Expand Up @@ -385,7 +385,7 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Expand Down Expand Up @@ -497,7 +497,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Expand Down Expand Up @@ -538,7 +538,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
}
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Expand Down Expand Up @@ -625,7 +625,7 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Expand Down Expand Up @@ -687,7 +687,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
Expand Down Expand Up @@ -724,7 +724,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: testDNSTarget,
Expand Down Expand Up @@ -845,7 +845,7 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: testDNSTarget,
Expand Down

0 comments on commit effbf91

Please sign in to comment.