Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: skip unhealthy endpoints #3137

Merged
merged 4 commits into from Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/balancergroup_test.go
Expand Up @@ -37,7 +37,7 @@ var (
testBackendAddrs []resolver.Address
)

const testBackendAddrsCount = 8
const testBackendAddrsCount = 12

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
Expand Down
9 changes: 9 additions & 0 deletions xds/internal/balancer/edsbalancer/edsbalancer.go
Expand Up @@ -26,6 +26,7 @@ import (
"sync"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -223,6 +224,14 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
newWeight := locality.GetLoadBalancingWeight().GetValue()
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
// Filter out all "unhealthy" endpoints (unknown and
// healthy are both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY &&
lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN {
continue
}

socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
Expand Down
140 changes: 112 additions & 28 deletions xds/internal/balancer/edsbalancer/edsbalancer_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"strconv"
"testing"
"time"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand All @@ -39,9 +40,15 @@ import (
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"}
testEndpointAddrs []string
)

func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
}

type clusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
Expand All @@ -68,9 +75,13 @@ func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32)
}
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) {
type addLocalityOptions struct {
health []corepb.HealthStatus
}

func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string, opts *addLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for _, a := range addrsWithPort {
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
Expand All @@ -80,7 +91,7 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
panic("failed to atoi " + portStr)
}

lbEndPoints = append(lbEndPoints, &endpointpb.LbEndpoint{
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Expand All @@ -89,8 +100,12 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}}},
)
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.health) {
lbe.HealthStatus = opts.health[i]
}
lbEndPoints = append(lbEndPoints, lbe)
}

clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Expand Down Expand Up @@ -119,7 +134,7 @@ func TestEDS_OneLocality(t *testing.T) {

// One locality with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -137,7 +152,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, add one more backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(clab2.build())

sc2 := <-cc.newSubConnCh
Expand All @@ -156,7 +171,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, delete first backend.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -176,7 +191,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, replace backend.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab4.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -199,7 +214,7 @@ func TestEDS_OneLocality(t *testing.T) {

// The same locality, different drop rate, dropping 50%.
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab5.build())

// Picks with drops.
Expand Down Expand Up @@ -228,8 +243,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand All @@ -251,9 +266,9 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add another locality, with one backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())

sc3 := <-cc.newSubConnCh
Expand All @@ -272,8 +287,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Remove first locality.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab3.build())

scToRemove := <-cc.removeSubConnCh
Expand All @@ -294,8 +309,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Add a backend to the last locality.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab4.build())

sc4 := <-cc.newSubConnCh
Expand All @@ -317,8 +332,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1].
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2])
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2], nil)
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab5.build())

// Test pick with two subconns different locality weight.
Expand All @@ -336,8 +351,8 @@ func TestEDS_TwoLocalities(t *testing.T) {

// Change weight of the locality[1] to 0, it should never be picked.
clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2])
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2], nil)
clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab6.build())

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
Expand All @@ -364,6 +379,75 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
}

// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func TestEDS_EndpointsHealth(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc, nil)

// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:6], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is also DRAINING, TIMEOUT, and DEGRADED. We should ideally include those as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[6:12], &addLocalityOptions{
health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
edsb.HandleEDSResponse(clab1.build())

var (
readySCs []balancer.SubConn

wantNewSubConnAddrStrs = []string{
testEndpointAddrs[0],
testEndpointAddrs[2],
testEndpointAddrs[6],
testEndpointAddrs[8],
}
)
for i := 0; i < 4; i++ {
addr := <-cc.newSubConnAddrsCh
if addr[0].Addr != wantNewSubConnAddrStrs[i] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will they always happen in order?

Optional assuming they are always ordered for now: would a map[string]bool work better for wantNewSubConnAddrStrs? It could be more future-proof in case the order changes later. To ensure it doesn't do the same address four times, you could delete the entries from the map as they are pulled in from the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with sort and cmp.Equal

t.Fatalf("want newSubConn with address %q, got %v", wantNewSubConnAddrStrs[i], addr)
}
sc := <-cc.newSubConnCh
dfawley marked this conversation as resolved.
Show resolved Hide resolved
edsb.HandleSubConnStateChange(sc, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
readySCs = append(readySCs, sc)
}
// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
select {
case <-cc.newSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}

// Test roundrobin with the subconns.
p1 := <-cc.newPickerCh
want := readySCs
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

func TestClose(t *testing.T) {
edsb := NewXDSBalancer(nil, nil)
// This is what could happen when switching between fallback and eds. This
Expand Down Expand Up @@ -426,8 +510,8 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

p0 := <-cc.newPickerCh
Expand Down Expand Up @@ -572,8 +656,8 @@ func TestEDS_LoadReport(t *testing.T) {

// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())

sc1 := <-cc.newSubConnCh
Expand Down