Skip to content

Commit

Permalink
Merge pull request #1704 from murgatroid99/grpc-js-xds_routing_traffi…
Browse files Browse the repository at this point in the history
…c_splitting_config_selector

grpc-js-xds: Do traffic splitting and routing in the resolver
  • Loading branch information
murgatroid99 committed Mar 22, 2021
2 parents 07764ac + e7eaeeb commit b2c4dcc
Show file tree
Hide file tree
Showing 16 changed files with 1,264 additions and 555 deletions.
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/package.json
Expand Up @@ -43,7 +43,8 @@
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14",
"google-auth-library": "^7.0.2"
"google-auth-library": "^7.0.2",
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.2"
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Expand Up @@ -51,8 +51,9 @@ grpc/tools/run_tests/helper_scripts/prep_xds.sh
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver \
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
GRPC_XDS_EXPERIMENTAL_ROUTING=true \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \
--test_case="all" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
Expand Down
22 changes: 22 additions & 0 deletions packages/grpc-js-xds/src/environment.ts
@@ -0,0 +1,22 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/**
* Environment variable protection for traffic splitting and routing
* https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#xds-resolver-and-xds-client
*/
export const GRPC_XDS_EXPERIMENTAL_ROUTING = (process.env.GRPC_XDS_EXPERIMENTAL_ROUTING === 'true');
15 changes: 5 additions & 10 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Expand Up @@ -16,7 +16,7 @@
*/

import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
Expand All @@ -26,6 +26,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import { EdsLoadBalancingConfig } from './load-balancer-eds';
import { Watcher } from './xds-stream-state/xds-stream-state';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -65,7 +66,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private watcher: Watcher<Cluster__Output>;

private isWatcherActive = false;
Expand Down Expand Up @@ -121,12 +121,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.xdsClient = attributes.xdsClient;
this.latestAttributes = attributes;

/* If the cluster is changing, disable the old watcher before adding the new
Expand All @@ -136,7 +131,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.latestConfig?.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand All @@ -152,7 +147,7 @@ export class CdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
}
Expand All @@ -166,7 +161,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand Down
21 changes: 8 additions & 13 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -16,7 +16,7 @@
*/

import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
Expand All @@ -33,6 +33,7 @@ import PickResultType = experimental.PickResultType;
import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimental';
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target';
import { LrsLoadBalancingConfig } from './load-balancer-lrs';
import { Watcher } from './xds-stream-state/xds-stream-state';

const TRACER_NAME = 'eds_balancer';

Expand Down Expand Up @@ -122,11 +123,10 @@ export class EdsLoadBalancer implements LoadBalancer {
* requests.
*/
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private edsServiceName: string | null = null;
private watcher: Watcher<ClusterLoadAssignment__Output>;
/**
* Indicates whether the watcher has already been passed to this.xdsClient
* Indicates whether the watcher has already been passed to the xdsClient
* and is getting updates.
*/
private isWatcherActive = false;
Expand Down Expand Up @@ -377,7 +377,7 @@ export class EdsLoadBalancer implements LoadBalancer {
validateLoadBalancingConfig({ round_robin: {} }),
];
let childPolicy: LoadBalancingConfig[];
if (this.lastestConfig.getLrsLoadReportingServerName()) {
if (this.lastestConfig.getLrsLoadReportingServerName() !== undefined) {
childPolicy = [new LrsLoadBalancingConfig(this.lastestConfig.getCluster(), this.lastestConfig.getEdsServiceName() ?? '', this.lastestConfig.getLrsLoadReportingServerName()!, localityObj.locality, endpointPickingPolicy)];
} else {
childPolicy = endpointPickingPolicy;
Expand Down Expand Up @@ -427,21 +427,16 @@ export class EdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient;
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();

/* If the name is changing, disable the old watcher before adding the new
* one */
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
this.isWatcherActive = false;
Expand All @@ -454,12 +449,12 @@ export class EdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = true;
}

if (lbConfig.getLrsLoadReportingServerName()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
Expand All @@ -480,7 +475,7 @@ export class EdsLoadBalancer implements LoadBalancer {
destroy(): void {
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
if (this.edsServiceName) {
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
}
this.childBalancer.destroy();
}
Expand Down
8 changes: 2 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Expand Up @@ -16,9 +16,8 @@
*/

import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
import { type } from 'os';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { XdsClusterLocalityStats, XdsClient } from './xds-client';
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
Expand Down Expand Up @@ -208,10 +207,7 @@ export class LrsLoadBalancer implements LoadBalancer {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
return;
}
this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats(
this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats(
lbConfig.getLrsLoadReportingServerName(),
lbConfig.getClusterName(),
lbConfig.getEdsServiceName(),
Expand Down
Expand Up @@ -44,7 +44,7 @@ interface ClusterManagerChild {
child_policy: LoadBalancingConfig[];
}

class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
export class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand Down

0 comments on commit b2c4dcc

Please sign in to comment.