Skip to content

Commit

Permalink
xds: drop localities with zero weight at the xdsClient layer (#5476)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Jun 30, 2022
1 parent 423cd8e commit 5770b1d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 32 deletions.
3 changes: 0 additions & 3 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Expand Up @@ -251,9 +251,6 @@ func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresourc
var priorityIntSlice []int
priorities := make(map[int][]xdsresource.Locality)
for _, locality := range localities {
if locality.Weight == 0 {
continue
}
priority := int(locality.Priority)
priorities[priority] = append(priorities[priority], locality)
priorityIntSlice = append(priorityIntSlice, priority)
Expand Down
25 changes: 0 additions & 25 deletions xds/internal/balancer/clusterresolver/eds_impl_test.go
Expand Up @@ -295,31 +295,6 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}); err != nil {
t.Fatal(err)
}

// Change weight of the locality[1] to 0, it should never be picked.
clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
// should also be removed.
//
// NOTE: this is because we handle locality with weight 0 same as the
// locality doesn't exist. If this changes in the future, this removeSubConn
// behavior will also change.
scToRemove2 := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove2)
}

// Test pick with two subconns different locality weight.
//
// Locality-1 will be not be picked, and locality-2 will be picked.
// Locality-2 contains sc3 and sc4. So expect sc3, sc4.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3, sc4}); err != nil {
t.Fatal(err)
}
}

// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
Expand Down
5 changes: 4 additions & 1 deletion xds/internal/xdsclient/xdsresource/type_eds.go
Expand Up @@ -64,7 +64,10 @@ type Locality struct {

// EndpointsUpdate contains an EDS update.
type EndpointsUpdate struct {
Drops []OverloadDropConfig
Drops []OverloadDropConfig
// Localities in the EDS response with `load_balancing_weight` field not set
// or explicitly set to 0 are ignored while parsing the resource, and
// therefore do not show up here.
Localities []Locality

// Raw is the resource from the xds response.
Expand Down
9 changes: 7 additions & 2 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds.go
Expand Up @@ -57,7 +57,7 @@ func unmarshalEndpointsResource(r *anypb.Any, logger *grpclog.PrefixLogger) (str
}
logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, pretty.ToJSON(cla))

u, err := parseEDSRespProto(cla)
u, err := parseEDSRespProto(cla, logger)
if err != nil {
return cla.GetClusterName(), EndpointsUpdate{}, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint {
return endpoints
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) {
ret := EndpointsUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
Expand All @@ -113,6 +113,11 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate,
if l == nil {
return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
}
weight := locality.GetLoadBalancingWeight().GetValue()
if weight == 0 {
logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l))
continue
}
lid := internal.LocalityID{
Region: l.Region,
Zone: l.Zone,
Expand Down
16 changes: 15 additions & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go
Expand Up @@ -75,6 +75,20 @@ func (s) TestEDSParseRespProto(t *testing.T) {
want: EndpointsUpdate{},
wantErr: true,
},
{
name: "missing locality weight",
m: func() *v3endpointpb.ClusterLoadAssignment {
clab0 := newClaBuilder("test", nil)
clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{
Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
})
clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{
Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
})
return clab0.Build()
}(),
want: EndpointsUpdate{},
},
{
name: "good",
m: func() *v3endpointpb.ClusterLoadAssignment {
Expand Down Expand Up @@ -161,7 +175,7 @@ func (s) TestEDSParseRespProto(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseEDSRespProto(tt.m)
got, err := parseEDSRespProto(tt.m, nil)
if (err != nil) != tt.wantErr {
t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down

0 comments on commit 5770b1d

Please sign in to comment.