Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add HashPolicy fields to RDS update #4521

Merged
merged 17 commits into from Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions clientconn.go
Expand Up @@ -711,7 +711,12 @@ func (cc *ClientConn) switchBalancer(name string) {
return
}
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()
}

builder := balancer.Get(name)
Expand Down
3 changes: 1 addition & 2 deletions clientconn_test.go
Expand Up @@ -735,16 +735,15 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
time.Sleep(10 * time.Millisecond)
cc.mu.RLock()
v := cc.mkp.Time
cc.mu.RUnlock()
if v == 20*time.Second {
// Success
cc.mu.RUnlock()
return
}
if ctx.Err() != nil {
// Timeout
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
}
cc.mu.RUnlock()
}
}

Expand Down
22 changes: 13 additions & 9 deletions internal/grpcrand/grpcrand.go
Expand Up @@ -31,26 +31,30 @@ var (
mu sync.Mutex
)

// Int implements rand.Int on the grpcrand global source.
func Int() int {
mu.Lock()
defer mu.Unlock()
return r.Int()
}

// Int63n implements rand.Int63n on the grpcrand global source.
func Int63n(n int64) int64 {
mu.Lock()
res := r.Int63n(n)
mu.Unlock()
return res
defer mu.Unlock()
return r.Int63n(n)
}

// Intn implements rand.Intn on the grpcrand global source.
func Intn(n int) int {
mu.Lock()
res := r.Intn(n)
mu.Unlock()
return res
defer mu.Unlock()
return r.Intn(n)
}

// Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 {
mu.Lock()
res := r.Float64()
mu.Unlock()
return res
defer mu.Unlock()
return r.Float64()
}
20 changes: 5 additions & 15 deletions internal/xds/env/env.go
Expand Up @@ -39,9 +39,7 @@ const (
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"

Expand All @@ -62,18 +60,10 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)

// CircuitBreakingSupport indicates whether circuit breaking support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
CircuitBreakingSupport = !strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "false")
// TimeoutSupport indicates whether support for max_stream_duration in
// route actions is enabled. This can be disabled by setting the
// environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "false".
TimeoutSupport = !strings.EqualFold(os.Getenv(timeoutSupportEnv), "false")
// FaultInjectionSupport is used to control both fault injection and HTTP
// filter support.
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
// RingHashSupport indicates whether ring hash support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "true".
RingHashSupport = strings.EqualFold(os.Getenv(ringHashSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
Expand Down
10 changes: 10 additions & 0 deletions resolver_conn_wrapper.go
Expand Up @@ -39,6 +39,8 @@ type ccResolverWrapper struct {
resolver resolver.Resolver
done *grpcsync.Event
curState resolver.State

incomingMu sync.Mutex // Synchronizes all the incoming calls.
}

// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
Expand Down Expand Up @@ -90,6 +92,8 @@ func (ccr *ccResolverWrapper) close() {
}

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
ccr.incomingMu.Lock()
defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return nil
}
Expand All @@ -105,6 +109,8 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
}

func (ccr *ccResolverWrapper) ReportError(err error) {
ccr.incomingMu.Lock()
defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
Expand All @@ -114,6 +120,8 @@ func (ccr *ccResolverWrapper) ReportError(err error) {

// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.incomingMu.Lock()
defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
Expand All @@ -128,6 +136,8 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
ccr.incomingMu.Lock()
defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return
}
Expand Down
28 changes: 8 additions & 20 deletions vet.sh
Expand Up @@ -32,26 +32,14 @@ PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}"
go version

if [[ "$1" = "-install" ]]; then
# Check for module support
if go help mod >& /dev/null; then
# Install the pinned versions as defined in module tools.
pushd ./test/tools
go install \
golang.org/x/lint/golint \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell
popd
else
# Ye olde `go get` incantation.
# Note: this gets the latest version of all tools (vs. the pinned versions
# with Go modules).
go get -u \
golang.org/x/lint/golint \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell
fi
# Install the pinned versions as defined in module tools.
pushd ./test/tools
go install \
golang.org/x/lint/golint \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell
popd
if [[ -z "${VET_SKIP_PROTO}" ]]; then
if [[ "${TRAVIS}" = "true" ]]; then
PROTOBUF_VERSION=3.14.0
Expand Down
20 changes: 2 additions & 18 deletions xds/csds/csds.go
Expand Up @@ -38,33 +38,17 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
)

// xdsClient contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClient interface {
DumpLDS() (string, map[string]xdsclient.UpdateWithMD)
DumpRDS() (string, map[string]xdsclient.UpdateWithMD)
DumpCDS() (string, map[string]xdsclient.UpdateWithMD)
DumpEDS() (string, map[string]xdsclient.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}

var (
logger = grpclog.Component("xds")
newXDSClient = func() xdsClient {
newXDSClient = func() xdsclient.XDSClient {
c, err := xdsclient.New()
if err != nil {
// If err is not nil, c is a typed nil (of type *xdsclient.Client).
// If c is returned and assigned to the xdsClient field in the CSDS
// server, the nil checks in the handlers will not handle it
// properly.
logger.Warningf("failed to create xds client: %v", err)
return nil
}
Expand All @@ -76,7 +60,7 @@ var (
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practice. But we keep a copy in each
// server instance for testing.
xdsClient xdsClient
xdsClient xdsclient.XDSClient
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
Expand Down
13 changes: 3 additions & 10 deletions xds/csds/csds_test.go
Expand Up @@ -59,13 +59,6 @@ const (
defaultTestTimeout = 10 * time.Second
)

type xdsClientWithWatch interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func()
}

var cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }),
Expand Down Expand Up @@ -250,7 +243,7 @@ func TestCSDS(t *testing.T) {
}
}

func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
func commonSetup(t *testing.T) (xdsclient.XDSClient, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()

// Spin up a xDS management server on a local port.
Expand All @@ -275,7 +268,7 @@ func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, strin
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return xdsC }
newXDSClient = func() xdsclient.XDSClient { return xdsC }

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
Expand Down Expand Up @@ -635,7 +628,7 @@ func protoToJSON(p proto.Message) string {

func TestCSDSNoXDSClient(t *testing.T) {
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return nil }
newXDSClient = func() xdsclient.XDSClient { return nil }
defer func() { newXDSClient = oldNewXDSClient }()

// Initialize an gRPC server and register CSDS on it.
Expand Down
13 changes: 6 additions & 7 deletions xds/googledirectpath/googlec2p.go
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/googlecloud"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
Expand All @@ -61,15 +62,11 @@ const (
dnsName, xdsName = "dns", "xds"
)

type xdsClient interface {
Close()
}

// For overriding in unittests.
var (
onGCE = googlecloud.OnGCE

newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
return xdsclient.NewWithConfig(config)
}

Expand Down Expand Up @@ -138,7 +135,7 @@ func (c2pResolverBuilder) Scheme() string {

type c2pResolver struct {
resolver.Resolver
client xdsClient
client xdsclient.XDSClient
}

func (r *c2pResolver) Close() {
Expand All @@ -152,13 +149,15 @@ var ipv6EnabledMetadata = &structpb.Struct{
},
}

var id = fmt.Sprintf("C2P-%d", grpcrand.Int())

// newNode makes a copy of defaultNode, and populate it's Metadata and
// Locality fields.
func newNode(zone string, ipv6Capable bool) *v3corepb.Node {
ret := &v3corepb.Node{
// Not all required fields are set in defaultNote. Metadata will be set
// if ipv6 is enabled. Locality will be set to the value from metadata.
Id: "C2P",
Id: id,
UserAgentName: gRPCUserAgentName,
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{clientFeatureNoOverprovisioning},
Expand Down
6 changes: 4 additions & 2 deletions xds/googledirectpath/googlec2p_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestBuildNotOnGCE(t *testing.T) {
}

type testXDSClient struct {
xdsclient.XDSClient
closed chan struct{}
}

Expand Down Expand Up @@ -177,7 +179,7 @@ func TestBuildXDS(t *testing.T) {

configCh := make(chan *bootstrap.Config, 1)
oldNewClient := newClientWithConfig
newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
configCh <- config
return tXDSClient, nil
}
Expand All @@ -194,7 +196,7 @@ func TestBuildXDS(t *testing.T) {
}

wantNode := &v3corepb.Node{
Id: "C2P",
Id: id,
Metadata: nil,
Locality: &v3corepb.Locality{Zone: testZone},
UserAgentName: gRPCUserAgentName,
Expand Down