Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/federation: support federation in LRS #5128

Merged
merged 4 commits into from Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -318,12 +318,20 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)

if cu.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
bootstrapConfig := b.xdsClient.BootstrapConfig()
parsedName := xdsresource.ParseName(cu.ClusterName)
if parsedName.Scheme == xdsresource.FederationScheme {
// Is a federation resource name, find the corresponding
// authority server config.
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
dms[i].LoadReportingServer = cfg.XDSServer
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// Not a federation resource name, use the default
// authority.
dms[i].LoadReportingServer = bootstrapConfig.XDSServer
}
}
case xdsresource.ClusterTypeLogicalDNS:
dms[i] = clusterresolver.DiscoveryMechanism{
Expand Down
14 changes: 11 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand All @@ -48,6 +49,11 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

var defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
ServerURI: "self_server",
CredsType: "self_creds",
}

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -209,8 +215,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *inter
MaxConcurrentRequests: countMax,
}
if enableLRS {
discoveryMechanism.LoadReportingServerName = new(string)

discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism},
Expand Down Expand Up @@ -354,6 +359,9 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
// to the edsBalancer.
func (s) TestHandleClusterUpdate(t *testing.T) {
xdsC, cdsB, edsB, _, cancel := setupWithWatch(t)
xdsC.SetBootstrapConfig(&bootstrap.Config{
XDSServer: defaultTestAuthorityServerConfig,
})
defer func() {
cancel()
cdsB.Close()
Expand All @@ -367,7 +375,7 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
}{
{
name: "happy-case-with-lrs",
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf},
wantCCS: edsCCS(serviceName, nil, true, nil),
},
{
Expand Down
77 changes: 42 additions & 35 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Expand Up @@ -41,22 +41,26 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

const (
defaultTestTimeout = 1 * time.Second
defaultShortTestTimeout = 100 * time.Microsecond

testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
)

var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
}

cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
Expand Down Expand Up @@ -103,9 +107,9 @@ func (s) TestDropByCategory(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason,
RequestsPerMillion: million * dropNumerator / dropDenominator,
Expand All @@ -125,8 +129,8 @@ func (s) TestDropByCategory(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -191,9 +195,9 @@ func (s) TestDropByCategory(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason2,
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
Expand Down Expand Up @@ -257,10 +261,10 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
MaxConcurrentRequests: &maxRequest,
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
MaxConcurrentRequests: &maxRequest,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -276,8 +280,8 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -605,9 +609,9 @@ func (s) TestLoadReporting(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
// Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
Expand All @@ -624,8 +628,8 @@ func (s) TestLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

sc1 := <-cc.NewSubConnCh
Expand Down Expand Up @@ -720,9 +724,9 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(""),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -738,17 +742,21 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}

testLRSServerConfig2 := &bootstrap.ServerConfig{
ServerURI: "trafficdirector-another.googleapis.com:443",
CredsType: "google_default",
}
// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig2,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand All @@ -763,17 +771,16 @@ func (s) TestUpdateLRSServer(t *testing.T) {
if err2 != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
}
if got2.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
if got2.Server != testLRSServerConfig2 {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
}

// Update LRS server to nil, to disable LRS.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: nil,
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
Expand Down
19 changes: 10 additions & 9 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Expand Up @@ -41,6 +41,7 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

Expand Down Expand Up @@ -104,7 +105,7 @@ type clusterImplBalancer struct {
childLB balancer.Balancer
cancelLoadReport func()
edsServiceName string
lrsServerName *string
lrsServer *bootstrap.ServerConfig
loadWrapper *loadstore.Wrapper

clusterNameMu sync.Mutex
Expand Down Expand Up @@ -171,22 +172,22 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
)

// Check if it's necessary to restart load report.
if b.lrsServerName == nil {
if newConfig.LoadReportingServerName != nil {
if b.lrsServer == nil {
if newConfig.LoadReportingServer != nil {
// Old is nil, new is not nil, start new LRS.
b.lrsServerName = newConfig.LoadReportingServerName
b.lrsServer = newConfig.LoadReportingServer
startNewLoadReport = true
}
// Old is nil, new is nil, do nothing.
} else if newConfig.LoadReportingServerName == nil {
} else if newConfig.LoadReportingServer == nil {
// Old is not nil, new is nil, stop old, don't start new.
b.lrsServerName = newConfig.LoadReportingServerName
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
} else {
// Old is not nil, new is not nil, compare string values, if
// different, stop old and start new.
if *b.lrsServerName != *newConfig.LoadReportingServerName {
b.lrsServerName = newConfig.LoadReportingServerName
if *b.lrsServer != *newConfig.LoadReportingServer {
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
startNewLoadReport = true
}
Expand All @@ -206,7 +207,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
if startNewLoadReport {
var loadStore *load.Store
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(*b.lrsServerName)
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
}
b.loadWrapper.UpdateLoadStore(loadStore)
}
Expand Down
15 changes: 9 additions & 6 deletions xds/internal/balancer/clusterimpl/config.go
Expand Up @@ -23,6 +23,7 @@ import (

internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

// DropConfig contains the category, and drop ratio.
Expand All @@ -35,12 +36,14 @@ type DropConfig struct {
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Cluster string `json:"cluster,omitempty"`
EDSServiceName string `json:"edsServiceName,omitempty"`
LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
Cluster string `json:"cluster,omitempty"`
EDSServiceName string `json:"edsServiceName,omitempty"`
// LoadReportingServer is the LRS server to send load reports to. If not
// present, load reporting will be disabled.
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}

func parseConfig(c json.RawMessage) (*LBConfig, error) {
Expand Down
21 changes: 11 additions & 10 deletions xds/internal/balancer/clusterimpl/config_test.go
Expand Up @@ -22,17 +22,22 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/balancer/weightedtarget"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
testJSONConfig = `{
"cluster": "test_cluster",
"edsServiceName": "test-eds",
"lrsLoadReportingServerName": "lrs_server",
"lrsLoadReportingServer": {
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [ { "type": "google_default" } ]
},
"maxConcurrentRequests": 123,
"dropCategories": [
{
Expand Down Expand Up @@ -106,10 +111,10 @@ func TestParseConfig(t *testing.T) {
name: "OK",
js: testJSONConfig,
want: &LBConfig{
Cluster: "test_cluster",
EDSServiceName: "test-eds",
LoadReportingServerName: newString("lrs_server"),
MaxConcurrentRequests: newUint32(123),
Cluster: "test_cluster",
EDSServiceName: "test-eds",
LoadReportingServer: testLRSServerConfig,
MaxConcurrentRequests: newUint32(123),
DropCategories: []DropConfig{
{Category: "drop-1", RequestsPerMillion: 314},
{Category: "drop-2", RequestsPerMillion: 159},
Expand All @@ -128,17 +133,13 @@ func TestParseConfig(t *testing.T) {
if (err != nil) != tt.wantErr {
t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if !cmp.Equal(got, tt.want) {
if !cmp.Equal(got, tt.want, cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds")) {
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))
}
})
}
}

func newString(s string) *string {
return &s
}

func newUint32(i uint32) *uint32 {
return &i
}