From 49e7d822bf55ee6852c4f4ab090213c6b1ee88fc Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 11 Oct 2019 14:41:36 -0700 Subject: [PATCH 1/4] [xds_eds_endpoint_health] eds: skip unhealthy endpoints Unknown and healthy are both considered healthy. --- .../balancer/edsbalancer/edsbalancer.go | 9 ++ .../balancer/edsbalancer/edsbalancer_test.go | 124 ++++++++++++++---- 2 files changed, 105 insertions(+), 28 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/edsbalancer.go b/xds/internal/balancer/edsbalancer/edsbalancer.go index f6f5f2ce6e8..968238129a0 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer.go @@ -26,6 +26,7 @@ import ( "sync" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" typepb "github.com/envoyproxy/go-control-plane/envoy/type" "google.golang.org/grpc/balancer" @@ -223,6 +224,14 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) newWeight := locality.GetLoadBalancingWeight().GetValue() var newAddrs []resolver.Address for _, lbEndpoint := range locality.GetLbEndpoints() { + // Filter out all "unhealthy" endpoints (unknown and + // healthy are both considered to be healthy: + // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). + if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY && + lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN { + continue + } + socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress() address := resolver.Address{ Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))), diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index 3e5d85d41fb..63a27bd50c0 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -23,6 +23,7 @@ import ( "reflect" "strconv" "testing" + "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -39,9 +40,15 @@ import ( var ( testClusterNames = []string{"test-cluster-1", "test-cluster-2"} testSubZones = []string{"I", "II", "III", "IV"} - testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"} + testEndpointAddrs []string ) +func init() { + for i := 0; i < testBackendAddrsCount; i++ { + testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) + } +} + type clusterLoadAssignmentBuilder struct { v *xdspb.ClusterLoadAssignment } @@ -68,9 +75,13 @@ func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) } } -func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) { +type addLocalityOptions struct { + health []corepb.HealthStatus +} + +func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string, opts *addLocalityOptions) { var lbEndPoints []*endpointpb.LbEndpoint - for _, a := range addrsWithPort { + for i, a := range addrsWithPort { host, portStr, err := net.SplitHostPort(a) if err != nil { panic("failed to split " + a) @@ -80,7 +91,7 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin panic("failed to atoi " + portStr) } - lbEndPoints = append(lbEndPoints, &endpointpb.LbEndpoint{ + lbe := &endpointpb.LbEndpoint{ HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ Endpoint: &endpointpb.Endpoint{ Address: &corepb.Address{ @@ -89,8 +100,12 @@ func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uin Protocol: corepb.SocketAddress_TCP, Address: host, PortSpecifier: &corepb.SocketAddress_PortValue{ - PortValue: uint32(port)}}}}}}}, - ) + PortValue: uint32(port)}}}}}}, + } + if opts != nil && i < len(opts.health) { + lbe.HealthStatus = opts.health[i] + } + lbEndPoints = append(lbEndPoints, lbe) } clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ @@ -119,7 +134,7 @@ func TestEDS_OneLocality(t *testing.T) { // One locality with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil) edsb.HandleEDSResponse(clab1.build()) sc1 := <-cc.newSubConnCh @@ -137,7 +152,7 @@ func TestEDS_OneLocality(t *testing.T) { // The same locality, add one more backend. clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2]) + clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2], nil) edsb.HandleEDSResponse(clab2.build()) sc2 := <-cc.newSubConnCh @@ -156,7 +171,7 @@ func TestEDS_OneLocality(t *testing.T) { // The same locality, delete first backend. clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2]) + clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2], nil) edsb.HandleEDSResponse(clab3.build()) scToRemove := <-cc.removeSubConnCh @@ -176,7 +191,7 @@ func TestEDS_OneLocality(t *testing.T) { // The same locality, replace backend. clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3]) + clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil) edsb.HandleEDSResponse(clab4.build()) sc3 := <-cc.newSubConnCh @@ -199,7 +214,7 @@ func TestEDS_OneLocality(t *testing.T) { // The same locality, different drop rate, dropping 50%. clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) - clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3]) + clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3], nil) edsb.HandleEDSResponse(clab5.build()) // Picks with drops. @@ -228,8 +243,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Two localities, each with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) - clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) edsb.HandleEDSResponse(clab1.build()) sc1 := <-cc.newSubConnCh @@ -251,9 +266,9 @@ func TestEDS_TwoLocalities(t *testing.T) { // Add another locality, with one backend. clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) - clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) - clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3]) + clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil) + clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) + clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil) edsb.HandleEDSResponse(clab2.build()) sc3 := <-cc.newSubConnCh @@ -272,8 +287,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Remove first locality. clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) - clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3]) + clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) + clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3], nil) edsb.HandleEDSResponse(clab3.build()) scToRemove := <-cc.removeSubConnCh @@ -294,8 +309,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Add a backend to the last locality. clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) - clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4]) + clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) + clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil) edsb.HandleEDSResponse(clab4.build()) sc4 := <-cc.newSubConnCh @@ -317,8 +332,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Change weight of the locality[1]. clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2]) - clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4]) + clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2], nil) + clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil) edsb.HandleEDSResponse(clab5.build()) // Test pick with two subconns different locality weight. @@ -336,8 +351,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Change weight of the locality[1] to 0, it should never be picked. clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2]) - clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4]) + clab6.addLocality(testSubZones[1], 0, testEndpointAddrs[1:2], nil) + clab6.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4], nil) edsb.HandleEDSResponse(clab6.build()) // Changing weight of locality[1] to 0 caused it to be removed. It's subconn @@ -364,6 +379,59 @@ func TestEDS_TwoLocalities(t *testing.T) { } } +// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only +// healthy ones are used. +func TestEDS_EndpointsHealth(t *testing.T) { + cc := newTestClientConn(t) + edsb := NewXDSBalancer(cc, nil) + + // Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown. + clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:3], &addLocalityOptions{ + health: []corepb.HealthStatus{ + corepb.HealthStatus_HEALTHY, + corepb.HealthStatus_UNHEALTHY, + corepb.HealthStatus_UNKNOWN, + }, + }) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[3:6], &addLocalityOptions{ + health: []corepb.HealthStatus{ + corepb.HealthStatus_HEALTHY, + corepb.HealthStatus_UNHEALTHY, + corepb.HealthStatus_UNKNOWN, + }, + }) + edsb.HandleEDSResponse(clab1.build()) + + var readySCs []balancer.SubConn + for i := 0; i < 4; i++ { + sc := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc, connectivity.Ready) + readySCs = append(readySCs, sc) + } + // There should be exactly 4 new SubConns. Check to make sure there's no + // more subconns being created. + // + // This is check is very necessary. The pick later won't fail even if eds + // doesn't respect health status, because pick only returns Ready subconn. + select { + case <-cc.newSubConnCh: + t.Fatalf("Got unexpected new subconn") + case <-time.After(time.Microsecond * 100): + } + + // Test roundrobin with two subconns. + p1 := <-cc.newPickerCh + want := readySCs + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + func TestClose(t *testing.T) { edsb := NewXDSBalancer(nil, nil) // This is what could happen when switching between fallback and eds. This @@ -426,8 +494,8 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { // Two localities, each with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) - clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) edsb.HandleEDSResponse(clab1.build()) p0 := <-cc.newPickerCh @@ -572,8 +640,8 @@ func TestEDS_LoadReport(t *testing.T) { // Two localities, each with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) - clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1], nil) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2], nil) edsb.HandleEDSResponse(clab1.build()) sc1 := <-cc.newSubConnCh From 4c225d3c042c6602be2c6f338c9ef4b1c474db02 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 4 Nov 2019 13:02:17 -0800 Subject: [PATCH 2/4] [xds_eds_endpoint_health] test covers all health status --- .../balancer/edsbalancer/balancergroup_test.go | 2 +- xds/internal/balancer/edsbalancer/edsbalancer_test.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/balancergroup_test.go b/xds/internal/balancer/edsbalancer/balancergroup_test.go index 4e79967a750..58cae6e58e2 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup_test.go +++ b/xds/internal/balancer/edsbalancer/balancergroup_test.go @@ -37,7 +37,7 @@ var ( testBackendAddrs []resolver.Address ) -const testBackendAddrsCount = 8 +const testBackendAddrsCount = 12 func init() { for i := 0; i < testBackendAddrsCount; i++ { diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index 63a27bd50c0..38c169a5417 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -387,18 +387,24 @@ func TestEDS_EndpointsHealth(t *testing.T) { // Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:3], &addLocalityOptions{ + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:6], &addLocalityOptions{ health: []corepb.HealthStatus{ corepb.HealthStatus_HEALTHY, corepb.HealthStatus_UNHEALTHY, corepb.HealthStatus_UNKNOWN, + corepb.HealthStatus_DRAINING, + corepb.HealthStatus_TIMEOUT, + corepb.HealthStatus_DEGRADED, }, }) - clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[3:6], &addLocalityOptions{ + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[6:12], &addLocalityOptions{ health: []corepb.HealthStatus{ corepb.HealthStatus_HEALTHY, corepb.HealthStatus_UNHEALTHY, corepb.HealthStatus_UNKNOWN, + corepb.HealthStatus_DRAINING, + corepb.HealthStatus_TIMEOUT, + corepb.HealthStatus_DEGRADED, }, }) edsb.HandleEDSResponse(clab1.build()) From a8bba8fe50afdc5b1120b0bde0156c775625ea2f Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 4 Nov 2019 13:36:03 -0800 Subject: [PATCH 3/4] [xds_eds_endpoint_health] check new sc addrs --- .../balancer/edsbalancer/edsbalancer_test.go | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index 38c169a5417..1beb11a8ea6 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -409,8 +409,21 @@ func TestEDS_EndpointsHealth(t *testing.T) { }) edsb.HandleEDSResponse(clab1.build()) - var readySCs []balancer.SubConn + var ( + readySCs []balancer.SubConn + + wantNewSubConnAddrStrs = []string{ + testEndpointAddrs[0], + testEndpointAddrs[2], + testEndpointAddrs[6], + testEndpointAddrs[8], + } + ) for i := 0; i < 4; i++ { + addr := <-cc.newSubConnAddrsCh + if addr[0].Addr != wantNewSubConnAddrStrs[i] { + t.Fatalf("want newSubConn with address %q, got %v", wantNewSubConnAddrStrs[i], addr) + } sc := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc, connectivity.Connecting) edsb.HandleSubConnStateChange(sc, connectivity.Ready) @@ -418,16 +431,13 @@ func TestEDS_EndpointsHealth(t *testing.T) { } // There should be exactly 4 new SubConns. Check to make sure there's no // more subconns being created. - // - // This is check is very necessary. The pick later won't fail even if eds - // doesn't respect health status, because pick only returns Ready subconn. select { case <-cc.newSubConnCh: t.Fatalf("Got unexpected new subconn") case <-time.After(time.Microsecond * 100): } - // Test roundrobin with two subconns. + // Test roundrobin with the subconns. p1 := <-cc.newPickerCh want := readySCs if err := isRoundRobin(want, func() balancer.SubConn { From 55d7d421bc2e873e14d6e0e9abc7427a064a5e90 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 4 Nov 2019 18:02:50 -0800 Subject: [PATCH 4/4] [xds_eds_endpoint_health] compare strings --- .../balancer/edsbalancer/edsbalancer_test.go | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index 1beb11a8ea6..8a9d2663e7b 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "reflect" + "sort" "strconv" "testing" "time" @@ -30,6 +31,7 @@ import ( endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" typepb "github.com/envoyproxy/go-control-plane/envoy/type" typespb "github.com/golang/protobuf/ptypes/wrappers" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" @@ -410,25 +412,32 @@ func TestEDS_EndpointsHealth(t *testing.T) { edsb.HandleEDSResponse(clab1.build()) var ( - readySCs []balancer.SubConn - - wantNewSubConnAddrStrs = []string{ - testEndpointAddrs[0], - testEndpointAddrs[2], - testEndpointAddrs[6], - testEndpointAddrs[8], - } + readySCs []balancer.SubConn + newSubConnAddrStrs []string ) for i := 0; i < 4; i++ { addr := <-cc.newSubConnAddrsCh - if addr[0].Addr != wantNewSubConnAddrStrs[i] { - t.Fatalf("want newSubConn with address %q, got %v", wantNewSubConnAddrStrs[i], addr) - } + newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr) sc := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc, connectivity.Connecting) edsb.HandleSubConnStateChange(sc, connectivity.Ready) readySCs = append(readySCs, sc) } + + wantNewSubConnAddrStrs := []string{ + testEndpointAddrs[0], + testEndpointAddrs[2], + testEndpointAddrs[6], + testEndpointAddrs[8], + } + sortStrTrans := cmp.Transformer("Sort", func(in []string) []string { + sort.Strings(in) + return in + }) + if !cmp.Equal(newSubConnAddrStrs, wantNewSubConnAddrStrs, sortStrTrans) { + t.Fatalf("want newSubConn with address %v, got %v", wantNewSubConnAddrStrs, newSubConnAddrStrs) + } + // There should be exactly 4 new SubConns. Check to make sure there's no // more subconns being created. select {