Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: drop localities with zero weight at the xdsClient layer #5476

Merged
merged 2 commits into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Expand Up @@ -251,9 +251,6 @@ func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresourc
var priorityIntSlice []int
priorities := make(map[int][]xdsresource.Locality)
for _, locality := range localities {
if locality.Weight == 0 {
continue
}
priority := int(locality.Priority)
priorities[priority] = append(priorities[priority], locality)
priorityIntSlice = append(priorityIntSlice, priority)
Expand Down
25 changes: 0 additions & 25 deletions xds/internal/balancer/clusterresolver/eds_impl_test.go
Expand Up @@ -295,31 +295,6 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}); err != nil {
t.Fatal(err)
}

// Change weight of the locality[1] to 0, it should never be picked.
clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
// should also be removed.
//
// NOTE: this is because we handle locality with weight 0 same as the
// locality doesn't exist. If this changes in the future, this removeSubConn
// behavior will also change.
scToRemove2 := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove2)
}

// Test pick with two subconns different locality weight.
//
// Locality-1 will be not be picked, and locality-2 will be picked.
// Locality-2 contains sc3 and sc4. So expect sc3, sc4.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3, sc4}); err != nil {
t.Fatal(err)
}
}

// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
Expand Down
5 changes: 4 additions & 1 deletion xds/internal/xdsclient/xdsresource/type_eds.go
Expand Up @@ -64,7 +64,10 @@ type Locality struct {

// EndpointsUpdate contains an EDS update.
type EndpointsUpdate struct {
Drops []OverloadDropConfig
Drops []OverloadDropConfig
// Localities in the EDS response with `load_balancing_weight` field not set
// or explicitly set to 0 are ignored while parsing the resource, and
// therefore do not show up here.
Localities []Locality

// Raw is the resource from the xds response.
Expand Down
9 changes: 7 additions & 2 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds.go
Expand Up @@ -57,7 +57,7 @@ func unmarshalEndpointsResource(r *anypb.Any, logger *grpclog.PrefixLogger) (str
}
logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, pretty.ToJSON(cla))

u, err := parseEDSRespProto(cla)
u, err := parseEDSRespProto(cla, logger)
if err != nil {
return cla.GetClusterName(), EndpointsUpdate{}, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint {
return endpoints
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) {
ret := EndpointsUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
Expand All @@ -113,6 +113,11 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate,
if l == nil {
return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
}
weight := locality.GetLoadBalancingWeight().GetValue()
if weight == 0 {
logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l))
continue
}
lid := internal.LocalityID{
Region: l.Region,
Zone: l.Zone,
Expand Down
16 changes: 15 additions & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go
Expand Up @@ -75,6 +75,20 @@ func (s) TestEDSParseRespProto(t *testing.T) {
want: EndpointsUpdate{},
wantErr: true,
},
{
name: "missing locality weight",
m: func() *v3endpointpb.ClusterLoadAssignment {
clab0 := newClaBuilder("test", nil)
clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{
Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
})
clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{
Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY},
})
return clab0.Build()
}(),
want: EndpointsUpdate{},
},
{
name: "good",
m: func() *v3endpointpb.ClusterLoadAssignment {
Expand Down Expand Up @@ -161,7 +175,7 @@ func (s) TestEDSParseRespProto(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseEDSRespProto(tt.m)
got, err := parseEDSRespProto(tt.m, nil)
if (err != nil) != tt.wantErr {
t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down