Skip to content

Commit

Permalink
eds: skip unhealthy endpoints (#3137)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 5, 2019
1 parent 583401a commit bbd4b7a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 29 deletions.
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/balancergroup_test.go
Expand Up @@ -37,7 +37,7 @@ var (
testBackendAddrs []resolver.Address
)

const testBackendAddrsCount = 8
const testBackendAddrsCount = 12

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
Expand Down
9 changes: 9 additions & 0 deletions xds/internal/balancer/edsbalancer/edsbalancer.go
Expand Up @@ -26,6 +26,7 @@ import (
"sync"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -223,6 +224,14 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
newWeight := locality.GetLoadBalancingWeight().GetValue()
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
// Filter out all "unhealthy" endpoints (unknown and
// healthy are both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY &&
lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN {
continue
}

socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
Expand Down
149 changes: 121 additions & 28 deletions xds/internal/balancer/edsbalancer/edsbalancer_test.go
Expand Up @@ -21,14 +21,17 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strconv"
"testing"
"time"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
typespb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
Expand All @@ -39,9 +42,15 @@ import (
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"}
testEndpointAddrs []string
)

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
}

type clusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
Expand All @@ -68,9 +77,13 @@ func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32)
}
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) {
type addLocalityOptions struct {
health []corepb.HealthStatus
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string, opts *addLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for _, a := range addrsWithPort {
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
Expand All @@ -80,7 +93,7 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
panic("failed to atoi " + portStr)
}

lbEndPoints = append(lbEndPoints, &endpointpb.LbEndpoint{
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Expand All @@ -89,8 +102,12 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}}},
)
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.health) {
lbe.HealthStatus = opts.health[i]
}
lbEndPoints = append(lbEndPoints, lbe)
}

clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Expand Down Expand Up @@ -119,7 +136,7 @@ func TestEDS_OneLocality(t *testing.T) {

// One locality with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -137,7 +154,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, add one more backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(clab2.build())

sc2 := <-cc.newSubConnCh
Expand All @@ -156,7 +173,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, delete first backend.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -176,7 +193,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, replace backend.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab4.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -199,7 +216,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, different drop rate, dropping 50%.
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab5.build())

// Picks with drops.
Expand Down Expand Up @@ -228,8 +245,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -251,9 +268,9 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add another locality, with one backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -272,8 +289,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Remove first locality.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -294,8 +311,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add a backend to the last locality.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab4.build())

sc4 := <-cc.newSubConnCh
Expand All @@ -317,8 +334,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1].
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2])
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2], nil)
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab5.build())

// Test pick with two subconns different locality weight.
Expand All @@ -336,8 +353,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1] to 0, it should never be picked.
clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2])
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2], nil)
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab6.build())

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
Expand All @@ -364,6 +381,82 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
}

// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func TestEDS_EndpointsHealth(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc, nil)

// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:6], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[6:12], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
edsb.HandleEDSResponse(clab1.build())

var (
readySCs []balancer.SubConn
newSubConnAddrStrs []string
)
for i := 0; i < 4; i++ {
addr := <-cc.newSubConnAddrsCh
newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr)
sc := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
readySCs = append(readySCs, sc)
}

wantNewSubConnAddrStrs := []string{
testEndpointAddrs[0],
testEndpointAddrs[2],
testEndpointAddrs[6],
testEndpointAddrs[8],
}
sortStrTrans := cmp.Transformer("Sort", func(in []string) []string {
sort.Strings(in)
return in
})
if !cmp.Equal(newSubConnAddrStrs, wantNewSubConnAddrStrs, sortStrTrans) {
t.Fatalf("want newSubConn with address %v, got %v", wantNewSubConnAddrStrs, newSubConnAddrStrs)
}

// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
select {
case <-cc.newSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}

// Test roundrobin with the subconns.
p1 := <-cc.newPickerCh
want := readySCs
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

func TestClose(t *testing.T) {
edsb := NewXDSBalancer(nil, nil)
// This is what could happen when switching between fallback and eds. This
Expand Down Expand Up @@ -426,8 +519,8 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

p0 := <-cc.newPickerCh
Expand Down Expand Up @@ -572,8 +665,8 @@ func TestEDS_LoadReport(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand Down

0 comments on commit bbd4b7a

Please sign in to comment.