From 0a68f8aff020eeb868a4921672afe7773b3359e7 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 26 Jan 2022 11:39:10 -0800 Subject: [PATCH] xds/federation: support federation in LRS (#5128) --- .../balancer/cdsbalancer/cdsbalancer.go | 20 +++-- .../balancer/cdsbalancer/cdsbalancer_test.go | 14 +++- .../balancer/clusterimpl/balancer_test.go | 77 ++++++++++--------- .../balancer/clusterimpl/clusterimpl.go | 19 ++--- xds/internal/balancer/clusterimpl/config.go | 15 ++-- .../balancer/clusterimpl/config_test.go | 21 ++--- .../balancer/clusterresolver/config.go | 20 ++--- .../balancer/clusterresolver/config_test.go | 71 +++++++++++------ .../balancer/clusterresolver/configbuilder.go | 10 +-- .../clusterresolver/configbuilder_test.go | 62 +++++++-------- xds/internal/testutils/fakeclient/client.go | 4 +- xds/internal/xdsclient/attributes.go | 2 +- xds/internal/xdsclient/authority.go | 15 ++-- xds/internal/xdsclient/bootstrap/bootstrap.go | 57 ++++++++++---- .../xdsclient/bootstrap/bootstrap_test.go | 21 +++++ .../xdsclient/controller/loadreport.go | 7 +- .../xdsclient/controller/v2_cds_test.go | 2 +- xds/internal/xdsclient/loadreport.go | 24 +++--- xds/internal/xdsclient/loadreport_test.go | 20 ++++- xds/internal/xdsclient/xdsresource/name.go | 3 + .../xdsclient/xdsresource/type_cds.go | 22 +++++- .../xdsclient/xdsresource/unmarshal_cds.go | 11 ++- .../xdsresource/unmarshal_cds_test.go | 68 ++++++++-------- 23 files changed, 359 insertions(+), 226 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index f1149108507..0be796c47ba 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -318,12 +318,20 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { EDSServiceName: cu.EDSServiceName, MaxConcurrentRequests: cu.MaxRequests, } - if cu.EnableLRS { - // An empty string here indicates that the cluster_resolver balancer should use the - // same xDS server for load reporting as it does for EDS - // requests/responses. - dms[i].LoadReportingServerName = new(string) - + if cu.LRSServerConfig == xdsresource.ClusterLRSServerSelf { + bootstrapConfig := b.xdsClient.BootstrapConfig() + parsedName := xdsresource.ParseName(cu.ClusterName) + if parsedName.Scheme == xdsresource.FederationScheme { + // Is a federation resource name, find the corresponding + // authority server config. + if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok { + dms[i].LoadReportingServer = cfg.XDSServer + } + } else { + // Not a federation resource name, use the default + // authority. + dms[i].LoadReportingServer = bootstrapConfig.XDSServer + } } case xdsresource.ClusterTypeLogicalDNS: dms[i] = clusterresolver.DiscoveryMechanism{ diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 242f7fa6499..efa34dbab0e 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -48,6 +49,11 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) +var defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{ + ServerURI: "self_server", + CredsType: "self_creds", +} + type s struct { grpctest.Tester } @@ -209,8 +215,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *inter MaxConcurrentRequests: countMax, } if enableLRS { - discoveryMechanism.LoadReportingServerName = new(string) - + discoveryMechanism.LoadReportingServer = defaultTestAuthorityServerConfig } lbCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism}, @@ -354,6 +359,9 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) { // to the edsBalancer. func (s) TestHandleClusterUpdate(t *testing.T) { xdsC, cdsB, edsB, _, cancel := setupWithWatch(t) + xdsC.SetBootstrapConfig(&bootstrap.Config{ + XDSServer: defaultTestAuthorityServerConfig, + }) defer func() { cancel() cdsB.Close() @@ -367,7 +375,7 @@ func (s) TestHandleClusterUpdate(t *testing.T) { }{ { name: "happy-case-with-lrs", - cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, EnableLRS: true}, + cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf}, wantCCS: edsCCS(serviceName, nil, true, nil), }, { diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 5abf37fcbf1..d444ecd4f4f 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -41,6 +41,7 @@ import ( xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" ) @@ -48,15 +49,18 @@ const ( defaultTestTimeout = 1 * time.Second defaultShortTestTimeout = 100 * time.Microsecond - testClusterName = "test-cluster" - testServiceName = "test-eds-service" - testLRSServerName = "test-lrs-name" + testClusterName = "test-cluster" + testServiceName = "test-eds-service" ) var ( testBackendAddrs = []resolver.Address{ {Addr: "1.1.1.1:1"}, } + testLRSServerConfig = &bootstrap.ServerConfig{ + ServerURI: "trafficdirector.googleapis.com:443", + CredsType: "google_default", + } cmpOpts = cmp.Options{ cmpopts.EquateEmpty(), @@ -103,9 +107,9 @@ func (s) TestDropByCategory(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(testLRSServerName), + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, DropCategories: []DropConfig{{ Category: dropReason, RequestsPerMillion: million * dropNumerator / dropDenominator, @@ -125,8 +129,8 @@ func (s) TestDropByCategory(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != testLRSServerName { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) + if got.Server != testLRSServerConfig { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig) } sc1 := <-cc.NewSubConnCh @@ -191,9 +195,9 @@ func (s) TestDropByCategory(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(testLRSServerName), + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, DropCategories: []DropConfig{{ Category: dropReason2, RequestsPerMillion: million * dropNumerator2 / dropDenominator2, @@ -257,10 +261,10 @@ func (s) TestDropCircuitBreaking(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(testLRSServerName), - MaxConcurrentRequests: &maxRequest, + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: &maxRequest, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, }, @@ -276,8 +280,8 @@ func (s) TestDropCircuitBreaking(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != testLRSServerName { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) + if got.Server != testLRSServerConfig { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig) } sc1 := <-cc.NewSubConnCh @@ -605,9 +609,9 @@ func (s) TestLoadReporting(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(testLRSServerName), + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, // Locality: testLocality, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, @@ -624,8 +628,8 @@ func (s) TestLoadReporting(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != testLRSServerName { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) + if got.Server != testLRSServerConfig { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig) } sc1 := <-cc.NewSubConnCh @@ -720,9 +724,9 @@ func (s) TestUpdateLRSServer(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(""), + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, }, @@ -738,17 +742,21 @@ func (s) TestUpdateLRSServer(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != "" { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "") + if got.Server != testLRSServerConfig { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig) } + testLRSServerConfig2 := &bootstrap.ServerConfig{ + ServerURI: "trafficdirector-another.googleapis.com:443", + CredsType: "google_default", + } // Update LRS server to a different name. if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: newString(testLRSServerName), + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig2, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, }, @@ -763,17 +771,16 @@ func (s) TestUpdateLRSServer(t *testing.T) { if err2 != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2) } - if got2.Server != testLRSServerName { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName) + if got2.Server != testLRSServerConfig2 { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2) } // Update LRS server to nil, to disable LRS. if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), BalancerConfig: &LBConfig{ - Cluster: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: nil, + Cluster: testClusterName, + EDSServiceName: testServiceName, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, }, diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 03d357b1f4e..0a6cf6ca906 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -41,6 +41,7 @@ import ( xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/loadstore" "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" ) @@ -104,7 +105,7 @@ type clusterImplBalancer struct { childLB balancer.Balancer cancelLoadReport func() edsServiceName string - lrsServerName *string + lrsServer *bootstrap.ServerConfig loadWrapper *loadstore.Wrapper clusterNameMu sync.Mutex @@ -171,22 +172,22 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { ) // Check if it's necessary to restart load report. - if b.lrsServerName == nil { - if newConfig.LoadReportingServerName != nil { + if b.lrsServer == nil { + if newConfig.LoadReportingServer != nil { // Old is nil, new is not nil, start new LRS. - b.lrsServerName = newConfig.LoadReportingServerName + b.lrsServer = newConfig.LoadReportingServer startNewLoadReport = true } // Old is nil, new is nil, do nothing. - } else if newConfig.LoadReportingServerName == nil { + } else if newConfig.LoadReportingServer == nil { // Old is not nil, new is nil, stop old, don't start new. - b.lrsServerName = newConfig.LoadReportingServerName + b.lrsServer = newConfig.LoadReportingServer stopOldLoadReport = true } else { // Old is not nil, new is not nil, compare string values, if // different, stop old and start new. - if *b.lrsServerName != *newConfig.LoadReportingServerName { - b.lrsServerName = newConfig.LoadReportingServerName + if *b.lrsServer != *newConfig.LoadReportingServer { + b.lrsServer = newConfig.LoadReportingServer stopOldLoadReport = true startNewLoadReport = true } @@ -206,7 +207,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error { if startNewLoadReport { var loadStore *load.Store if b.xdsClient != nil { - loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(*b.lrsServerName) + loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer) } b.loadWrapper.UpdateLoadStore(loadStore) } diff --git a/xds/internal/balancer/clusterimpl/config.go b/xds/internal/balancer/clusterimpl/config.go index 51ff654f6eb..cfddc6fb2a1 100644 --- a/xds/internal/balancer/clusterimpl/config.go +++ b/xds/internal/balancer/clusterimpl/config.go @@ -23,6 +23,7 @@ import ( internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) // DropConfig contains the category, and drop ratio. @@ -35,12 +36,14 @@ type DropConfig struct { type LBConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` - Cluster string `json:"cluster,omitempty"` - EDSServiceName string `json:"edsServiceName,omitempty"` - LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"` - MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` - DropCategories []DropConfig `json:"dropCategories,omitempty"` - ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` + Cluster string `json:"cluster,omitempty"` + EDSServiceName string `json:"edsServiceName,omitempty"` + // LoadReportingServer is the LRS server to send load reports to. If not + // present, load reporting will be disabled. + LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"` + MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` + DropCategories []DropConfig `json:"dropCategories,omitempty"` + ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` } func parseConfig(c json.RawMessage) (*LBConfig, error) { diff --git a/xds/internal/balancer/clusterimpl/config_test.go b/xds/internal/balancer/clusterimpl/config_test.go index 88bed5c182c..b001b8fdf0a 100644 --- a/xds/internal/balancer/clusterimpl/config_test.go +++ b/xds/internal/balancer/clusterimpl/config_test.go @@ -22,17 +22,22 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/balancer" _ "google.golang.org/grpc/balancer/roundrobin" _ "google.golang.org/grpc/balancer/weightedtarget" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) const ( testJSONConfig = `{ "cluster": "test_cluster", "edsServiceName": "test-eds", - "lrsLoadReportingServerName": "lrs_server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 123, "dropCategories": [ { @@ -106,10 +111,10 @@ func TestParseConfig(t *testing.T) { name: "OK", js: testJSONConfig, want: &LBConfig{ - Cluster: "test_cluster", - EDSServiceName: "test-eds", - LoadReportingServerName: newString("lrs_server"), - MaxConcurrentRequests: newUint32(123), + Cluster: "test_cluster", + EDSServiceName: "test-eds", + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(123), DropCategories: []DropConfig{ {Category: "drop-1", RequestsPerMillion: 314}, {Category: "drop-2", RequestsPerMillion: 159}, @@ -128,17 +133,13 @@ func TestParseConfig(t *testing.T) { if (err != nil) != tt.wantErr { t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) } - if !cmp.Equal(got, tt.want) { + if !cmp.Equal(got, tt.want, cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds")) { t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want)) } }) } } -func newString(s string) *string { - return &s -} - func newUint32(i uint32) *uint32 { return &i } diff --git a/xds/internal/balancer/clusterresolver/config.go b/xds/internal/balancer/clusterresolver/config.go index a6a3cbab804..363afd03ab2 100644 --- a/xds/internal/balancer/clusterresolver/config.go +++ b/xds/internal/balancer/clusterresolver/config.go @@ -27,6 +27,7 @@ import ( internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) // DiscoveryMechanismType is the type of discovery mechanism. @@ -84,11 +85,9 @@ func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error { type DiscoveryMechanism struct { // Cluster is the cluster name. Cluster string `json:"cluster,omitempty"` - // LoadReportingServerName is the LRS server to send load reports to. If - // not present, load reporting will be disabled. If set to the empty string, - // load reporting will be sent to the same server that we obtained CDS data - // from. - LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"` + // LoadReportingServer is the LRS server to send load reports to. If not + // present, load reporting will be disabled. + LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"` // MaxConcurrentRequests is the maximum number of outstanding requests can // be made to the upstream cluster. Default is 1024. MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` @@ -110,8 +109,6 @@ func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { switch { case dm.Cluster != b.Cluster: return false - case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName): - return false case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests): return false case dm.Type != b.Type: @@ -121,17 +118,14 @@ func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { case dm.DNSHostname != b.DNSHostname: return false } - return true -} -func equalStringP(a, b *string) bool { - if a == nil && b == nil { + if dm.LoadReportingServer == nil && b.LoadReportingServer == nil { return true } - if a == nil || b == nil { + if (dm.LoadReportingServer != nil) != (b.LoadReportingServer != nil) { return false } - return *a == *b + return dm.LoadReportingServer.String() == b.LoadReportingServer.String() } func equalUint32P(a, b *uint32) bool { diff --git a/xds/internal/balancer/clusterresolver/config_test.go b/xds/internal/balancer/clusterresolver/config_test.go index 796f8a49372..fb859e75ba4 100644 --- a/xds/internal/balancer/clusterresolver/config_test.go +++ b/xds/internal/balancer/clusterresolver/config_test.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/internal/balancer/stub" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" ) func TestDiscoveryMechanismTypeMarshalJSON(t *testing.T) { @@ -102,7 +103,10 @@ const ( testJSONConfig1 = `{ "discoveryMechanisms": [{ "cluster": "test-cluster-name", - "lrsLoadReportingServerName": "test-lrs-server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 314, "type": "EDS", "edsServiceName": "test-eds-service-name" @@ -111,7 +115,10 @@ const ( testJSONConfig2 = `{ "discoveryMechanisms": [{ "cluster": "test-cluster-name", - "lrsLoadReportingServerName": "test-lrs-server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 314, "type": "EDS", "edsServiceName": "test-eds-service-name" @@ -122,7 +129,10 @@ const ( testJSONConfig3 = `{ "discoveryMechanisms": [{ "cluster": "test-cluster-name", - "lrsLoadReportingServerName": "test-lrs-server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 314, "type": "EDS", "edsServiceName": "test-eds-service-name" @@ -132,7 +142,10 @@ const ( testJSONConfig4 = `{ "discoveryMechanisms": [{ "cluster": "test-cluster-name", - "lrsLoadReportingServerName": "test-lrs-server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 314, "type": "EDS", "edsServiceName": "test-eds-service-name" @@ -142,7 +155,10 @@ const ( testJSONConfig5 = `{ "discoveryMechanisms": [{ "cluster": "test-cluster-name", - "lrsLoadReportingServerName": "test-lrs-server", + "lrsLoadReportingServer": { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ { "type": "google_default" } ] + }, "maxConcurrentRequests": 314, "type": "EDS", "edsServiceName": "test-eds-service-name" @@ -151,6 +167,11 @@ const ( }` ) +var testLRSServerConfig = &bootstrap.ServerConfig{ + ServerURI: "trafficdirector.googleapis.com:443", + CredsType: "google_default", +} + func TestParseConfig(t *testing.T) { tests := []struct { name string @@ -170,11 +191,11 @@ func TestParseConfig(t *testing.T) { want: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{ { - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServcie, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, }, }, XDSLBPolicy: nil, @@ -187,11 +208,11 @@ func TestParseConfig(t *testing.T) { want: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{ { - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServcie, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, }, { Type: DiscoveryMechanismTypeLogicalDNS, @@ -207,11 +228,11 @@ func TestParseConfig(t *testing.T) { want: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{ { - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServcie, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, }, }, XDSLBPolicy: &internalserviceconfig.BalancerConfig{ @@ -227,11 +248,11 @@ func TestParseConfig(t *testing.T) { want: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{ { - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServcie, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, }, }, XDSLBPolicy: &internalserviceconfig.BalancerConfig{ diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index c2404b387b6..4cce16ff9a3 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -257,11 +257,11 @@ var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Na // addresses. func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { clusterImplCfg := &clusterimpl.LBConfig{ - Cluster: mechanism.Cluster, - EDSServiceName: mechanism.EDSServiceName, - LoadReportingServerName: mechanism.LoadReportingServerName, - MaxConcurrentRequests: mechanism.MaxConcurrentRequests, - DropCategories: drops, + Cluster: mechanism.Cluster, + EDSServiceName: mechanism.EDSServiceName, + LoadReportingServer: mechanism.LoadReportingServer, + MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + DropCategories: drops, // ChildPolicy is not set. Will be set based on xdsLBPolicy } diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go index 011500f4bc9..607f7b22241 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -125,11 +125,11 @@ func TestBuildPriorityConfigJSON(t *testing.T) { gotConfig, _, err := buildPriorityConfigJSON([]priorityConfig{ { mechanism: DiscoveryMechanism{ - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServiceName, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, }, edsResp: xdsresource.EndpointsUpdate{ Drops: []xdsresource.OverloadDropConfig{ @@ -175,11 +175,11 @@ func TestBuildPriorityConfig(t *testing.T) { gotConfig, gotAddrs, _ := buildPriorityConfig([]priorityConfig{ { mechanism: DiscoveryMechanism{ - Cluster: testClusterName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServiceName, + Cluster: testClusterName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, }, edsResp: xdsresource.EndpointsUpdate{ Drops: []xdsresource.OverloadDropConfig{ @@ -212,10 +212,10 @@ func TestBuildPriorityConfig(t *testing.T) { Config: &internalserviceconfig.BalancerConfig{ Name: clusterimpl.Name, Config: &clusterimpl.LBConfig{ - Cluster: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), DropCategories: []clusterimpl.DropConfig{ { Category: testDropCategory, @@ -245,10 +245,10 @@ func TestBuildPriorityConfig(t *testing.T) { Config: &internalserviceconfig.BalancerConfig{ Name: clusterimpl.Name, Config: &clusterimpl.LBConfig{ - Cluster: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), DropCategories: []clusterimpl.DropConfig{ { Category: testDropCategory, @@ -369,11 +369,11 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, }, DiscoveryMechanism{ - Cluster: testClusterName, - MaxConcurrentRequests: newUint32(testMaxRequests), - LoadReportingServerName: newString(testLRSServer), - Type: DiscoveryMechanismTypeEDS, - EDSServiceName: testEDSServiceName, + Cluster: testClusterName, + MaxConcurrentRequests: newUint32(testMaxRequests), + LoadReportingServer: testLRSServerConfig, + Type: DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServiceName, }, nil, ) @@ -384,10 +384,10 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { } wantConfigs := map[string]*clusterimpl.LBConfig{ "priority-2-0": { - Cluster: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), DropCategories: []clusterimpl.DropConfig{ { Category: testDropCategory, @@ -411,10 +411,10 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, }, "priority-2-1": { - Cluster: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: newString(testLRSServer), - MaxConcurrentRequests: newUint32(testMaxRequests), + Cluster: testClusterName, + EDSServiceName: testEDSServiceName, + LoadReportingServer: testLRSServerConfig, + MaxConcurrentRequests: newUint32(testMaxRequests), DropCategories: []clusterimpl.DropConfig{ { Category: testDropCategory, diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 871aa7288c6..3ab57bbd489 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -242,11 +242,11 @@ func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) { // ReportLoadArgs wraps the arguments passed to ReportLoad. type ReportLoadArgs struct { // Server is the name of the server to which the load is reported. - Server string + Server *bootstrap.ServerConfig } // ReportLoad starts reporting load about clusterName to server. -func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) { +func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *load.Store, cancel func()) { xdsC.loadReportCh.Send(ReportLoadArgs{Server: server}) return xdsC.loadStore, func() { xdsC.lrsCancelCh.Send(nil) diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index 64f87f29659..51418162736 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -36,7 +36,7 @@ type XDSClient interface { WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func() WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func() WatchEndpoints(clusterName string, edsCb func(xdsresource.EndpointsUpdate, error)) (cancel func()) - ReportLoad(server string) (*load.Store, func()) + ReportLoad(server *bootstrap.ServerConfig) (*load.Store, func()) DumpLDS() map[string]xdsresource.UpdateWithMD DumpRDS() map[string]xdsresource.UpdateWithMD diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 6cc4c117755..1a236849c37 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -27,8 +27,6 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -const federationScheme = "xdstp" - // findAuthority returns the authority for this name. If it doesn't already // exist, one will be created. // @@ -49,7 +47,7 @@ func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref fun } config := c.config.XDSServer - if scheme == federationScheme { + if scheme == xdsresource.FederationScheme { cfg, ok := c.config.Authorities[authority] if !ok { return nil, nil, fmt.Errorf("xds: failed to find authority %q", authority) @@ -78,6 +76,9 @@ func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref fun // newAuthority creates a new authority for the config. But before that, it // checks the cache to see if an authority for this config already exists. // +// The caller must take a reference of the returned authority before using, and +// unref afterwards. +// // caller must hold c.authorityMu func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority, retErr error) { // First check if there's already an authority for this config. If found, it @@ -219,8 +220,12 @@ func (a *authority) watchEndpoints(clusterName string, cb func(xdsresource.Endpo } } -func (a *authority) reportLoad(server string) (*load.Store, func()) { - return a.controller.ReportLoad(server) +func (a *authority) reportLoad() (*load.Store, func()) { + // An empty string means to report load to the same same used for ADS. There + // should never be a need to specify a string other than an empty string. If + // a different server is to be used, a different authority (controller) will + // be created. + return a.controller.ReportLoad("") } func (a *authority) dump(t xdsresource.ResourceType) map[string]xdsresource.UpdateWithMD { diff --git a/xds/internal/xdsclient/bootstrap/bootstrap.go b/xds/internal/xdsclient/bootstrap/bootstrap.go index eef0e4fe6ff..15aed44eb73 100644 --- a/xds/internal/xdsclient/bootstrap/bootstrap.go +++ b/xds/internal/xdsclient/bootstrap/bootstrap.go @@ -107,19 +107,26 @@ func (sc *ServerConfig) String() string { return strings.Join([]string{sc.ServerURI, sc.CredsType, ver}, "-") } -// UnmarshalJSON takes the json data (a list of servers) and unmarshals the -// first one in the list. -func (sc *ServerConfig) UnmarshalJSON(data []byte) error { - var servers []*xdsServer - if err := json.Unmarshal(data, &servers); err != nil { - return fmt.Errorf("xds: json.Unmarshal(data) for field xds_servers failed during bootstrap: %v", err) +// MarshalJSON marshals the ServerConfig to json. +func (sc ServerConfig) MarshalJSON() ([]byte, error) { + server := xdsServer{ + ServerURI: sc.ServerURI, + ChannelCreds: []channelCreds{{Type: sc.CredsType, Config: nil}}, } - if len(servers) < 1 { - return fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any management server to connect to") + if sc.TransportAPI == version.TransportV3 { + server.ServerFeatures = []string{serverFeaturesV3} + } + return json.Marshal(server) +} + +// UnmarshalJSON takes the json data (a server) and unmarshals it to the struct. +func (sc *ServerConfig) UnmarshalJSON(data []byte) error { + var server xdsServer + if err := json.Unmarshal(data, &server); err != nil { + return fmt.Errorf("xds: json.Unmarshal(data) for field ServerConfig failed during bootstrap: %v", err) } - xs := servers[0] - sc.ServerURI = xs.ServerURI - for _, cc := range xs.ChannelCreds { + sc.ServerURI = server.ServerURI + for _, cc := range server.ChannelCreds { // We stop at the first credential type that we support. sc.CredsType = cc.Type if cc.Type == credsGoogleDefault { @@ -130,7 +137,7 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error { break } } - for _, f := range xs.ServerFeatures { + for _, f := range server.ServerFeatures { if f == serverFeaturesV3 { sc.TransportAPI = version.TransportV3 } @@ -138,6 +145,18 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error { return nil } +// unmarshalJSONServerConfigSlice unmarshals JSON to a slice. +func unmarshalJSONServerConfigSlice(data []byte) ([]*ServerConfig, error) { + var servers []*ServerConfig + if err := json.Unmarshal(data, &servers); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON to []*ServerConfig: %v", err) + } + if len(servers) < 1 { + return nil, fmt.Errorf("no management server found in JSON") + } + return servers, nil +} + // Authority contains configuration for an Authority for an xDS control plane // server. See the Authorities field in the Config struct for how it's used. type Authority struct { @@ -170,9 +189,11 @@ func (a *Authority) UnmarshalJSON(data []byte) error { for k, v := range jsonData { switch k { case "xds_servers": - if err := json.Unmarshal(v, &a.XDSServer); err != nil { - return fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + servers, err := unmarshalJSONServerConfigSlice(v) + if err != nil { + return fmt.Errorf("xds: json.Unmarshal(data) for field %q failed during bootstrap: %v", k, err) } + a.XDSServer = servers[0] case "client_listener_resource_name_template": if err := json.Unmarshal(v, &a.ClientListenerResourceNameTemplate); err != nil { return fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) @@ -243,7 +264,7 @@ type Config struct { type channelCreds struct { Type string `json:"type"` - Config json.RawMessage `json:"config"` + Config json.RawMessage `json:"config,omitempty"` } type xdsServer struct { @@ -325,9 +346,11 @@ func NewConfigFromContents(data []byte) (*Config, error) { return nil, fmt.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) } case "xds_servers": - if err := json.Unmarshal(v, &config.XDSServer); err != nil { - return nil, fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + servers, err := unmarshalJSONServerConfigSlice(v) + if err != nil { + return nil, fmt.Errorf("xds: json.Unmarshal(data) for field %q failed during bootstrap: %v", k, err) } + config.XDSServer = servers[0] case "certificate_providers": var providerInstances map[string]json.RawMessage if err := json.Unmarshal(v, &providerInstances); err != nil { diff --git a/xds/internal/xdsclient/bootstrap/bootstrap_test.go b/xds/internal/xdsclient/bootstrap/bootstrap_test.go index d681666077c..573a3fca173 100644 --- a/xds/internal/xdsclient/bootstrap/bootstrap_test.go +++ b/xds/internal/xdsclient/bootstrap/bootstrap_test.go @@ -994,3 +994,24 @@ func TestNewConfigWithFederation(t *testing.T) { }) } } + +func TestServerConfigMarshalAndUnmarshal(t *testing.T) { + c := ServerConfig{ + ServerURI: "test-server", + Creds: nil, + CredsType: "test-creds", + TransportAPI: version.TransportV3, + } + + bs, err := json.Marshal(c) + if err != nil { + t.Fatalf("failed to marshal: %v", err) + } + var cUnmarshal ServerConfig + if err := json.Unmarshal(bs, &cUnmarshal); err != nil { + t.Fatalf("failed to unmarshal: %v", err) + } + if diff := cmp.Diff(cUnmarshal, c); diff != "" { + t.Fatalf("diff (-got +want): %v", diff) + } +} diff --git a/xds/internal/xdsclient/controller/loadreport.go b/xds/internal/xdsclient/controller/loadreport.go index f8cfd017e41..a28cc95dc6f 100644 --- a/xds/internal/xdsclient/controller/loadreport.go +++ b/xds/internal/xdsclient/controller/loadreport.go @@ -35,9 +35,10 @@ import ( // It returns a Store for the user to report loads, a function to cancel the // load reporting stream. // -// TODO: LRS refactor; maybe a new controller should be created for a separate -// server, so that the same stream can be shared by different reporters to the -// same server, even if they originate from different Controllers. +// TODO(xdsfed): LRS refactor, delete the parameter of this function, and +// cleanup the multiple LRS ClientConn code. Each controller should have one +// ClientConn to the authority it's created for, all LRS streams (and ADS +// streams) in this controller should all share that ClientConn. func (c *Controller) ReportLoad(server string) (*load.Store, func()) { c.lrsMu.Lock() defer c.lrsMu.Unlock() diff --git a/xds/internal/xdsclient/controller/v2_cds_test.go b/xds/internal/xdsclient/controller/v2_cds_test.go index 20485dc1c28..d262b53a46b 100644 --- a/xds/internal/xdsclient/controller/v2_cds_test.go +++ b/xds/internal/xdsclient/controller/v2_cds_test.go @@ -138,7 +138,7 @@ func (s) TestCDSHandleResponse(t *testing.T) { cdsResponse: goodCDSResponse1, wantErr: false, wantUpdate: map[string]xdsresource.ClusterUpdateErrTuple{ - goodClusterName1: {Update: xdsresource.ClusterUpdate{ClusterName: goodClusterName1, EDSServiceName: serviceName1, EnableLRS: true, Raw: marshaledCluster1}}, + goodClusterName1: {Update: xdsresource.ClusterUpdate{ClusterName: goodClusterName1, EDSServiceName: serviceName1, LRSServerConfig: xdsresource.ClusterLRSServerSelf, Raw: marshaledCluster1}}, }, wantUpdateMD: xdsresource.UpdateMetadata{ Status: xdsresource.ServiceStatusACKed, diff --git a/xds/internal/xdsclient/loadreport.go b/xds/internal/xdsclient/loadreport.go index d157731c789..32c7e9c9d79 100644 --- a/xds/internal/xdsclient/loadreport.go +++ b/xds/internal/xdsclient/loadreport.go @@ -18,30 +18,26 @@ package xdsclient import ( + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/load" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -// ReportLoad starts an load reporting stream to the given server. If the server -// is not an empty string, and is different from the management server, a new -// ClientConn will be created. -// -// The same options used for creating the Client will be used (including -// NodeProto, and dial options if necessary). +// ReportLoad starts a load reporting stream to the given server. All load +// reports to the same server share the LRS stream. // // It returns a Store for the user to report loads, a function to cancel the // load reporting stream. -func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) { - // TODO: load reporting with federation also needs find the authority for - // this server first, then reports load to it. Currently always report to - // the default authority. This is needed to avoid a nil pointer panic. - a, unref, err := c.findAuthority(xdsresource.ParseName("")) +func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*load.Store, func()) { + a, err := c.newAuthority(server) if err != nil { + c.logger.Infof("xds: failed to connect to the control plane to do load reporting for authority %q: %v", server, err) return nil, func() {} } - store, cancelF := a.reportLoad(server) + // Hold the ref before starting load reporting. + a.ref() + store, cancelF := a.reportLoad() return store, func() { cancelF() - unref() + c.unrefAuthority(a) } } diff --git a/xds/internal/xdsclient/loadreport_test.go b/xds/internal/xdsclient/loadreport_test.go index 92b5ab6482d..3c564ea97c3 100644 --- a/xds/internal/xdsclient/loadreport_test.go +++ b/xds/internal/xdsclient/loadreport_test.go @@ -67,7 +67,15 @@ func (s) TestLRSClient(t *testing.T) { defer cancel() // Report to the same address should not create new ClientConn. - store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) + store1, lrsCancel1 := xdsC.ReportLoad( + &bootstrap.ServerConfig{ + ServerURI: fs.Address, + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + CredsType: "insecure", + TransportAPI: version.TransportV2, + NodeProto: &v2corepb.Node{}, + }, + ) defer lrsCancel1() if u, err := fs.NewConnChan.Receive(ctx); err != nil { @@ -87,7 +95,15 @@ func (s) TestLRSClient(t *testing.T) { defer sCleanup2() // Report to a different address should create new ClientConn. - store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) + store2, lrsCancel2 := xdsC.ReportLoad( + &bootstrap.ServerConfig{ + ServerURI: fs2.Address, + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + CredsType: "insecure", + TransportAPI: version.TransportV2, + NodeProto: &v2corepb.Node{}, + }, + ) defer lrsCancel2() if u, err := fs2.NewConnChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) diff --git a/xds/internal/xdsclient/xdsresource/name.go b/xds/internal/xdsclient/xdsresource/name.go index 076e3d617a4..eb1ee323cee 100644 --- a/xds/internal/xdsclient/xdsresource/name.go +++ b/xds/internal/xdsclient/xdsresource/name.go @@ -25,6 +25,9 @@ import ( "google.golang.org/grpc/internal/envconfig" ) +// FederationScheme is the scheme of a federation resource name. +const FederationScheme = "xdstp" + // Name contains the parsed component of an xDS resource name. // // An xDS resource name is in the format of diff --git a/xds/internal/xdsclient/xdsresource/type_cds.go b/xds/internal/xdsclient/xdsresource/type_cds.go index c200380be26..ce3438c121f 100644 --- a/xds/internal/xdsclient/xdsresource/type_cds.go +++ b/xds/internal/xdsclient/xdsresource/type_cds.go @@ -17,7 +17,9 @@ package xdsresource -import "google.golang.org/protobuf/types/known/anypb" +import ( + "google.golang.org/protobuf/types/known/anypb" +) // ClusterType is the type of cluster from a received CDS response. type ClusterType int @@ -35,6 +37,18 @@ const ( ClusterTypeAggregate ) +// ClusterLRSServerConfigType is the type of LRS server config. +type ClusterLRSServerConfigType int + +const ( + // ClusterLRSOff indicates LRS is off (loads are not reported for this + // cluster). + ClusterLRSOff ClusterLRSServerConfigType = iota + // ClusterLRSServerSelf indicates loads should be reported to the same + // server (the authority) where the CDS resp is received from. + ClusterLRSServerSelf +) + // ClusterLBPolicyRingHash represents ring_hash lb policy, and also contains its // config. type ClusterLBPolicyRingHash struct { @@ -51,8 +65,10 @@ type ClusterUpdate struct { // EDSServiceName is an optional name for EDS. If it's not set, the balancer // should watch ClusterName for the EDS resources. EDSServiceName string - // EnableLRS indicates whether or not load should be reported through LRS. - EnableLRS bool + // LRSServerConfig contains the server where the load reports should be sent + // to. This can be change to an interface, to support other types, e.g. a + // ServerConfig with ServerURI, creds. + LRSServerConfig ClusterLRSServerConfigType // SecurityCfg contains security configuration sent by the control plane. SecurityCfg *SecurityConfig // MaxRequests for circuit breaking, if any (otherwise nil). diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go index 5b34c1ae6e1..2b8d8d3aadd 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go @@ -127,12 +127,21 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu ret := ClusterUpdate{ ClusterName: cluster.GetName(), - EnableLRS: cluster.GetLrsServer().GetSelf() != nil, SecurityCfg: sc, MaxRequests: circuitBreakersFromCluster(cluster), LBPolicy: lbPolicy, } + // Note that this is different from the gRFC (gRFC A47 says to include the + // full ServerConfig{URL,creds,server feature} here). This information is + // not available here, because this function doesn't have access to the + // xdsclient bootstrap information now (can be added if necessary). The + // ServerConfig will be read and populated by the CDS balancer when + // processing this field. + if cluster.GetLrsServer().GetSelf() != nil { + ret.LRSServerConfig = ClusterLRSServerSelf + } + // Validate and set cluster type from the response. switch { case cluster.GetType() == v3clusterpb.Cluster_EDS: diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go index dd2f72e0fad..4aad9308fc0 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go @@ -46,7 +46,7 @@ const ( serviceName = "service" ) -var emptyUpdate = ClusterUpdate{ClusterName: clusterName, EnableLRS: false} +var emptyUpdate = ClusterUpdate{ClusterName: clusterName, LRSServerConfig: ClusterLRSOff} func (s) TestValidateCluster_Failure(t *testing.T) { tests := []struct { @@ -263,7 +263,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, EnableLRS: false, ClusterType: ClusterTypeAggregate, + ClusterName: clusterName, LRSServerConfig: ClusterLRSOff, ClusterType: ClusterTypeAggregate, PrioritizedClusterNames: []string{"a", "b", "c"}, }, }, @@ -298,7 +298,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, - wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: false}, + wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSOff}, }, { name: "happiest-case", @@ -320,7 +320,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, }, - wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true}, + wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf}, }, { name: "happiest-case-with-circuitbreakers", @@ -354,7 +354,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, }, - wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, + wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, }, { name: "happiest-case-with-ring-hash-lb-policy-with-default-config", @@ -377,7 +377,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, + ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf, LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: defaultRingHashMinSize, MaximumRingSize: defaultRingHashMaxSize}, }, }, @@ -408,7 +408,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, + ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: ClusterLRSServerSelf, LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100}, }, }, @@ -468,9 +468,9 @@ func (s) TestValidateClusterWithSecurityConfig_EnvVarOff(t *testing.T) { }, } wantUpdate := ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, } gotUpdate, err := validateClusterAndConstructClusterUpdate(cluster) if err != nil { @@ -1082,9 +1082,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1124,9 +1124,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1168,9 +1168,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1216,9 +1216,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1276,9 +1276,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1343,9 +1343,9 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - EnableLRS: false, + ClusterName: clusterName, + EDSServiceName: serviceName, + LRSServerConfig: ClusterLRSOff, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1489,7 +1489,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { wantUpdate: map[string]ClusterUpdateErrTuple{ v2ClusterName: {Update: ClusterUpdate{ ClusterName: v2ClusterName, - EDSServiceName: v2Service, EnableLRS: true, + EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v2ClusterAny, }}, }, @@ -1504,7 +1504,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { wantUpdate: map[string]ClusterUpdateErrTuple{ v3ClusterName: {Update: ClusterUpdate{ ClusterName: v3ClusterName, - EDSServiceName: v3Service, EnableLRS: true, + EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v3ClusterAny, }}, }, @@ -1519,12 +1519,12 @@ func (s) TestUnmarshalCluster(t *testing.T) { wantUpdate: map[string]ClusterUpdateErrTuple{ v2ClusterName: {Update: ClusterUpdate{ ClusterName: v2ClusterName, - EDSServiceName: v2Service, EnableLRS: true, + EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v2ClusterAny, }}, v3ClusterName: {Update: ClusterUpdate{ ClusterName: v3ClusterName, - EDSServiceName: v3Service, EnableLRS: true, + EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v3ClusterAny, }}, }, @@ -1548,12 +1548,12 @@ func (s) TestUnmarshalCluster(t *testing.T) { wantUpdate: map[string]ClusterUpdateErrTuple{ v2ClusterName: {Update: ClusterUpdate{ ClusterName: v2ClusterName, - EDSServiceName: v2Service, EnableLRS: true, + EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v2ClusterAny, }}, v3ClusterName: {Update: ClusterUpdate{ ClusterName: v3ClusterName, - EDSServiceName: v3Service, EnableLRS: true, + EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, Raw: v3ClusterAny, }}, "bad": {Err: cmpopts.AnyError},