From dc94ce73d5daa588e922c892762ed52175e68c42 Mon Sep 17 00:00:00 2001 From: erni27 Date: Wed, 12 Oct 2022 16:27:53 +0200 Subject: [PATCH] Add addr duplication check --- xds/internal/xdsclient/xdsresource/unmarshal_eds.go | 12 +++++++++--- .../xdsclient/xdsresource/unmarshal_eds_test.go | 11 +++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index 15b0d88f9f1..e091d0ddea0 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -91,7 +91,7 @@ func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropO } } -func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) ([]Endpoint, error) { +func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs map[string]bool) ([]Endpoint, error) { endpoints := make([]Endpoint, 0, len(lbEndpoints)) for _, lbEndpoint := range lbEndpoints { // If the load_balancing_weight field is specified, it must be set to a @@ -104,9 +104,14 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) ([]Endpoint, error) } weight = w.GetValue() } + addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()) + if uniqueEndpointAddrs[addr] { + return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr) + } + uniqueEndpointAddrs[addr] = true endpoints = append(endpoints, Endpoint{ HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), + Address: addr, Weight: weight, }) } @@ -120,6 +125,7 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.Pr } priorities := make(map[uint32]map[string]bool) sumOfWeights := make(map[uint32]uint64) + uniqueEndpointAddrs := make(map[string]bool) for _, locality := range m.Endpoints { l := locality.GetLocality() if l == nil { @@ -150,7 +156,7 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.Pr return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority) } localitiesWithPriority[lidStr] = true - endpoints, err := parseEndpoints(locality.GetLbEndpoints()) + endpoints, err := parseEndpoints(locality.GetLbEndpoints(), uniqueEndpointAddrs) if err != nil { return EndpointsUpdate{}, err } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index 02718e09dda..f89333c76e7 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -111,6 +111,17 @@ func (s) TestEDSParseRespProto(t *testing.T) { want: EndpointsUpdate{}, wantErr: true, }, + { + name: "duplicate endpoint address", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 1, []string{"addr:997"}, nil) + clab0.addLocality("locality-2", 1, 0, []string{"addr:997"}, nil) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, { name: "good", m: func() *v3endpointpb.ClusterLoadAssignment {