Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: a myriad of cleanups #4479

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 19 additions & 19 deletions xds/csds/csds.go
Expand Up @@ -38,37 +38,37 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/client/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register v3 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
)

// xdsClientInterface contains methods from xdsClient.Client which are used by
// xdsClient contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClientInterface interface {
DumpLDS() (string, map[string]client.UpdateWithMD)
DumpRDS() (string, map[string]client.UpdateWithMD)
DumpCDS() (string, map[string]client.UpdateWithMD)
DumpEDS() (string, map[string]client.UpdateWithMD)
type xdsClient interface {
DumpLDS() (string, map[string]xdsclient.UpdateWithMD)
DumpRDS() (string, map[string]xdsclient.UpdateWithMD)
DumpCDS() (string, map[string]xdsclient.UpdateWithMD)
DumpEDS() (string, map[string]xdsclient.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}

var (
logger = grpclog.Component("xds")
newXDSClient = func() (xdsClientInterface, error) {
return client.New()
newXDSClient = func() (xdsClient, error) {
return xdsclient.New()
}
)

// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practise. But we keep a copy in each
// server instance for testing.
xdsClient xdsClientInterface
xdsClient xdsClient
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
Expand Down Expand Up @@ -296,17 +296,17 @@ func (s *ClientStatusDiscoveryServer) buildEDSPerXDSConfig() *v3statuspb.PerXdsC
}
}

func serviceStatusToProto(serviceStatus client.ServiceStatus) v3adminpb.ClientResourceStatus {
func serviceStatusToProto(serviceStatus xdsclient.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case client.ServiceStatusUnknown:
case xdsclient.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case client.ServiceStatusRequested:
case xdsclient.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case client.ServiceStatusNotExist:
case xdsclient.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case client.ServiceStatusACKed:
case xdsclient.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case client.ServiceStatusNACKed:
case xdsclient.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
Expand Down
26 changes: 13 additions & 13 deletions xds/csds/csds_test.go
Expand Up @@ -36,10 +36,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/xds/internal/client"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
xtestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -59,11 +59,11 @@ const (
defaultTestTimeout = 10 * time.Second
)

type xdsClientInterfaceWithWatch interface {
WatchListener(string, func(client.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(client.RouteConfigUpdate, error)) func()
WatchCluster(string, func(client.ClusterUpdate, error)) func()
WatchEndpoints(string, func(client.EndpointsUpdate, error)) func()
type xdsClientWithWatch interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func()
}

var cmpOpts = cmp.Options{
Expand Down Expand Up @@ -174,16 +174,16 @@ func TestCSDS(t *testing.T) {
defer cleanup()

for _, target := range ldsTargets {
xdsC.WatchListener(target, func(client.ListenerUpdate, error) {})
xdsC.WatchListener(target, func(xdsclient.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(client.RouteConfigUpdate, error) {})
xdsC.WatchRouteConfig(target, func(xdsclient.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(client.ClusterUpdate, error) {})
xdsC.WatchCluster(target, func(xdsclient.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(client.EndpointsUpdate, error) {})
xdsC.WatchEndpoints(target, func(xdsclient.EndpointsUpdate, error) {})
}

for i := 0; i < retryCount; i++ {
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestCSDS(t *testing.T) {
}
}

func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()

// Spin up a xDS management server on a local port.
Expand All @@ -270,12 +270,12 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ
t.Fatal(err)
}
// Create xds_client.
xdsC, err := client.New()
xdsC, err := xdsclient.New()
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
newXDSClient = func() (xdsClient, error) {
return xdsC, nil
}

Expand Down
4 changes: 2 additions & 2 deletions xds/googledirectpath/googlec2p.go
Expand Up @@ -38,9 +38,9 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down
2 changes: 1 addition & 1 deletion xds/googledirectpath/googlec2p_test.go
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
)
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/balancergroup/balancergroup.go
Expand Up @@ -24,7 +24,7 @@ import (
"time"

orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/xdsclient/load"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/balancergroup/balancergroup_test.go
Expand Up @@ -44,8 +44,8 @@ import (
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

var (
Expand Down
20 changes: 9 additions & 11 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -35,8 +35,8 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand All @@ -59,7 +59,7 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient func() (xdsClientInterface, error)
newXDSClient func() (xdsClient, error)
buildProvider = buildProviderFunc
)

Expand All @@ -73,6 +73,8 @@ func init() {
// JSON service config, to be passed to the cdsBalancer.
type cdsBB struct{}

var _ balancer.ConfigParser = cdsBB{}

// Build creates a new CDS balancer with the ClientConn.
func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &cdsBalancer{
Expand Down Expand Up @@ -138,12 +140,11 @@ func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
return &cfg, nil
}

// xdsClientInterface contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClientInterface interface {
// xdsClient contains methods from xdsclient.Client which are used by the
// cdsBalancer. This will be faked out in unittests.
type xdsClient interface {
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
Close()
}

// ccUpdate wraps a clientConn update received from gRPC (pushed from the
Expand Down Expand Up @@ -185,7 +186,7 @@ type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClientInterface // xDS client to watch Cluster resource.
xdsClient xdsClient // xDS client to watch Cluster resource.
cancelWatch func() // Cluster watch cancel func.
edsLB balancer.Balancer // EDS child policy.
clusterToWatch string
Expand Down Expand Up @@ -408,9 +409,6 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
if newXDSClient != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an intended change, or merge conflict?

Copy link
Contributor

@menghanl menghanl Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know why this change is here.

I would say don't make this change, or at least, don't mix it in this XL PR.

And it's an incomplete test cleanup. It leaves the balancers in an inconsistent state (if the balancer creates the client, it should call close, even though we know close is a noop)

And it only changes the easy part, but leaves the hard part behind. We should modify the tests to use attributes to pass the fake xDS client, so we can completely get rid of newXDSClient.
But that's going to be a semi big change, it requires changing signature of xds.SetClient() to take an interface instead of *Client, so it can be the fakeClient.

I will make it, I need it to fix the tests in my other PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's an incomplete test cleanup. It leaves the balancers in an inconsistent state (if the balancer creates the client, it should call close, even though we know close is a noop)

It's for tests only, so I don't think it's important to call Close only for tests, which don't need it to be called. Many types don't need a Close method to be called when they're done being used. Our xdsClient interface is one of those.

You could also consider this half of the cleanup (removing the method from the interface and the call to it in b.Close), with the other half being changing how it's injected. I can add all this back if you want, but I do see it as a step in the right direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this doesn't give us enough benefit, to justify itself.
At least should not be mixed in a 84-file (trivial) change PR.
This diff really stick out when I was scanning the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down
Expand Up @@ -36,10 +36,10 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/resolver"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand Down Expand Up @@ -136,7 +136,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
Expand Down
7 changes: 3 additions & 4 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -37,10 +37,9 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
Expand Down Expand Up @@ -215,7 +214,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
Expand Down Expand Up @@ -606,7 +605,7 @@ func (s) TestCircuitBreaking(t *testing.T) {

// Since the counter's max requests was set to 1, the first request should
// succeed and the second should fail.
counter := client.GetServiceRequestsCounter(serviceName)
counter := xdsclient.GetServiceRequestsCounter(serviceName)
if err := counter.StartRequest(maxRequests); err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -20,7 +20,7 @@ import (
"errors"
"sync"

xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/xdsclient"
)

var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
Expand All @@ -40,7 +40,7 @@ type clusterHandler struct {
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate

xdsClient xdsClientInterface
xdsClient xdsClient
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
Expand Down Expand Up @@ -112,7 +112,7 @@ type clusterNode struct {

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsClient, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cluster_handler_test.go
Expand Up @@ -24,8 +24,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
Expand Down