Skip to content

Commit

Permalink
cluster_resolver: implement resource resolver to resolve EDS and DNS (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 12, 2021
1 parent 30dfb4b commit ebfe3be
Show file tree
Hide file tree
Showing 9 changed files with 1,371 additions and 64 deletions.
20 changes: 18 additions & 2 deletions resolver/manual/manual.go
Expand Up @@ -27,19 +27,27 @@ import (
// NewBuilderWithScheme creates a new test resolver builder with the given scheme.
func NewBuilderWithScheme(scheme string) *Resolver {
return &Resolver{
BuildCallback: func(resolver.Target, resolver.ClientConn, resolver.BuildOptions) {},
ResolveNowCallback: func(resolver.ResolveNowOptions) {},
CloseCallback: func() {},
scheme: scheme,
}
}

// Resolver is also a resolver builder.
// It's build() function always returns itself.
type Resolver struct {
// BuildCallback is called when the Build method is called. Must not be
// nil. Must not be changed after the resolver may be built.
BuildCallback func(resolver.Target, resolver.ClientConn, resolver.BuildOptions)
// ResolveNowCallback is called when the ResolveNow method is called on the
// resolver. Must not be nil. Must not be changed after the resolver may
// be built.
ResolveNowCallback func(resolver.ResolveNowOptions)
scheme string
// CloseCallback is called when the Close method is called. Must not be
// nil. Must not be changed after the resolver may be built.
CloseCallback func()
scheme string

// Fields actually belong to the resolver.
CC resolver.ClientConn
Expand All @@ -54,6 +62,7 @@ func (r *Resolver) InitialState(s resolver.State) {

// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.BuildCallback(target, cc, opts)
r.CC = cc
if r.bootstrapState != nil {
r.UpdateState(*r.bootstrapState)
Expand All @@ -72,9 +81,16 @@ func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {
}

// Close is a noop for Resolver.
func (*Resolver) Close() {}
func (r *Resolver) Close() {
r.CloseCallback()
}

// UpdateState calls CC.UpdateState.
func (r *Resolver) UpdateState(s resolver.State) {
r.CC.UpdateState(s)
}

// ReportError calls CC.ReportError.
func (r *Resolver) ReportError(err error) {
r.CC.ReportError(err)
}
42 changes: 42 additions & 0 deletions xds/internal/balancer/clusterresolver/balancerconfig/type.go
Expand Up @@ -95,4 +95,46 @@ type DiscoveryMechanism struct {
// This is used for EDS watch if set. If unset, Cluster is used for EDS
// watch.
EDSServiceName string `json:"edsServiceName,omitempty"`
// DNSHostname is the DNS name to resolve in "host:port" form. For type
// LOGICAL_DNS only.
DNSHostname string `json:"dnsHostname,omitempty"`
}

// Equal returns whether the DiscoveryMechanism is the same with the parameter.
func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool {
switch {
case dm.Cluster != b.Cluster:
return false
case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName):
return false
case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests):
return false
case dm.Type != b.Type:
return false
case dm.EDSServiceName != b.EDSServiceName:
return false
case dm.DNSHostname != b.DNSHostname:
return false
}
return true
}

func equalStringP(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

func equalUint32P(a, b *uint32) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}
20 changes: 10 additions & 10 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Expand Up @@ -231,7 +231,7 @@ func (s) TestSubConnStateChange(t *testing.T) {
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
Expand All @@ -286,11 +286,11 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
}

connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, connectionErr)

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}

Expand All @@ -304,13 +304,13 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
}

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
Expand Down Expand Up @@ -365,7 +365,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
Expand All @@ -379,7 +379,7 @@ func (s) TestErrorFromResolver(t *testing.T) {

sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}

Expand All @@ -394,7 +394,7 @@ func (s) TestErrorFromResolver(t *testing.T) {

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
edsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
Expand Down Expand Up @@ -423,7 +423,7 @@ func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resource
for _, name := range resourceNames {
if name == "" {
// ResourceName empty string indicates a cancel.
if err := fc.WaitForCancelEDSWatch(ctx); err != nil {
if _, err := fc.WaitForCancelEDSWatch(ctx); err != nil {
return fmt.Errorf("timed out when expecting resource %q", name)
}
continue
Expand Down
40 changes: 20 additions & 20 deletions xds/internal/balancer/clusterresolver/eds_impl_test.go
Expand Up @@ -103,7 +103,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -117,7 +117,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// The same locality, add one more backend.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)

sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -131,7 +131,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// The same locality, delete first backend.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)

scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
Expand All @@ -147,7 +147,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// The same locality, replace backend.
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab4.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)

sc3 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -166,7 +166,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// The same locality, different drop rate, dropping 50%.
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab5.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)

// Picks with drops.
if err := testPickerFromCh(cc.NewPickerCh, func(p balancer.Picker) error {
Expand All @@ -188,7 +188,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// The same locality, remove drops.
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab6.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)

// Pick without drops.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil {
Expand All @@ -209,7 +209,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
Expand All @@ -218,7 +218,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
Expand All @@ -233,7 +233,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)

sc3 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -248,7 +248,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)

scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
Expand All @@ -265,7 +265,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab4.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)

sc4 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -284,7 +284,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab5.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)

// Test pick with two subconns different locality weight.
//
Expand All @@ -299,7 +299,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab6.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)

// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
// should also be removed.
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
corepb.HealthStatus_DEGRADED,
},
})
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

var (
readySCs []balancer.SubConn
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }()

// The first update is an empty update.
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
// Pick should fail with transient failure, and all priority removed error.
if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil {
t.Fatal(err)
Expand All @@ -415,7 +415,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand All @@ -426,7 +426,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
t.Fatal(err)
}

xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
// Pick should fail with transient failure, and all priority removed error.
if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil {
t.Fatal(err)
Expand All @@ -442,7 +442,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

// Handle another update with priorities and localities.
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
Expand Down Expand Up @@ -484,7 +484,7 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

for i := 0; i < 2; i++ {
sc := <-cc.NewSubConnCh
Expand Down Expand Up @@ -579,7 +579,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
Expand Down

0 comments on commit ebfe3be

Please sign in to comment.