Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: skip unhealthy endpoints #3137

Merged
merged 4 commits into from Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/balancergroup_test.go
Expand Up @@ -37,7 +37,7 @@ var (
testBackendAddrs []resolver.Address
)

const testBackendAddrsCount = 8
const testBackendAddrsCount = 12

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
Expand Down
9 changes: 9 additions & 0 deletions xds/internal/balancer/edsbalancer/edsbalancer.go
Expand Up @@ -26,6 +26,7 @@ import (
"sync"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -223,6 +224,14 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
newWeight := locality.GetLoadBalancingWeight().GetValue()
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
// Filter out all "unhealthy" endpoints (unknown and
// healthy are both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY &&
lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN {
continue
}

socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
Expand Down
130 changes: 102 additions & 28 deletions xds/internal/balancer/edsbalancer/edsbalancer_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"strconv"
"testing"
"time"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand All @@ -39,9 +40,15 @@ import (
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"}
testEndpointAddrs []string
)

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
}

type clusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
Expand All @@ -68,9 +75,13 @@ func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32)
}
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) {
type addLocalityOptions struct {
health []corepb.HealthStatus
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string, opts *addLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for _, a := range addrsWithPort {
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
Expand All @@ -80,7 +91,7 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
panic("failed to atoi " + portStr)
}

lbEndPoints = append(lbEndPoints, &endpointpb.LbEndpoint{
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Expand All @@ -89,8 +100,12 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}}},
)
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.health) {
lbe.HealthStatus = opts.health[i]
}
lbEndPoints = append(lbEndPoints, lbe)
}

clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Expand Down Expand Up @@ -119,7 +134,7 @@ func TestEDS_OneLocality(t *testing.T) {

// One locality with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -137,7 +152,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, add one more backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(clab2.build())

sc2 := <-cc.newSubConnCh
Expand All @@ -156,7 +171,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, delete first backend.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -176,7 +191,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, replace backend.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab4.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -199,7 +214,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, different drop rate, dropping 50%.
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab5.build())

// Picks with drops.
Expand Down Expand Up @@ -228,8 +243,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -251,9 +266,9 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add another locality, with one backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -272,8 +287,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Remove first locality.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -294,8 +309,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add a backend to the last locality.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab4.build())

sc4 := <-cc.newSubConnCh
Expand All @@ -317,8 +332,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1].
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2])
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2], nil)
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab5.build())

// Test pick with two subconns different locality weight.
Expand All @@ -336,8 +351,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1] to 0, it should never be picked.
clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2])
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2], nil)
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab6.build())

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
Expand All @@ -364,6 +379,65 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
}

// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func TestEDS_EndpointsHealth(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc, nil)

// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:6], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is also DRAINING, TIMEOUT, and DEGRADED. We should ideally include those as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[6:12], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
edsb.HandleEDSResponse(clab1.build())

var readySCs []balancer.SubConn
for i := 0; i < 4; i++ {
sc := <-cc.newSubConnCh
dfawley marked this conversation as resolved.
Show resolved Hide resolved
edsb.HandleSubConnStateChange(sc, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
readySCs = append(readySCs, sc)
}
// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
//
// This is check is very necessary. The pick later won't fail even if eds
// doesn't respect health status, because pick only returns Ready subconn.
select {
case <-cc.newSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}

// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
want := readySCs
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

func TestClose(t *testing.T) {
edsb := NewXDSBalancer(nil, nil)
// This is what could happen when switching between fallback and eds. This
Expand Down Expand Up @@ -426,8 +500,8 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

p0 := <-cc.newPickerCh
Expand Down Expand Up @@ -572,8 +646,8 @@ func TestEDS_LoadReport(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand Down