Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Do traffic splitting and routing in the resolver #1704

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/package.json
Expand Up @@ -43,7 +43,8 @@
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14",
"google-auth-library": "^7.0.2"
"google-auth-library": "^7.0.2",
"re2-wasm": "^1.0.1"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.2"
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Expand Up @@ -51,8 +51,9 @@ grpc/tools/run_tests/helper_scripts/prep_xds.sh
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver \
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
GRPC_XDS_EXPERIMENTAL_ROUTING=true \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \
--test_case="all" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
Expand Down
22 changes: 22 additions & 0 deletions packages/grpc-js-xds/src/environment.ts
@@ -0,0 +1,22 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/**
* Environment variable protection for traffic splitting and routing
* https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#xds-resolver-and-xds-client
*/
export const GRPC_XDS_EXPERIMENTAL_ROUTING = (process.env.GRPC_XDS_EXPERIMENTAL_ROUTING === 'true');
15 changes: 5 additions & 10 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Expand Up @@ -16,7 +16,7 @@
*/

import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher } from './xds-client';
import { getSingletonXdsClient, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import UnavailablePicker = experimental.UnavailablePicker;
Expand All @@ -26,6 +26,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import { EdsLoadBalancingConfig } from './load-balancer-eds';
import { Watcher } from './xds-stream-state/xds-stream-state';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -65,7 +66,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private watcher: Watcher<Cluster__Output>;

private isWatcherActive = false;
Expand Down Expand Up @@ -121,12 +121,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
this.xdsClient = attributes.xdsClient;
this.latestAttributes = attributes;

/* If the cluster is changing, disable the old watcher before adding the new
Expand All @@ -136,7 +131,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.latestConfig?.getCluster() !== lbConfig.getCluster()
) {
trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster());
this.xdsClient.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand All @@ -152,7 +147,7 @@ export class CdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster());
this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher);
getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher);
this.isWatcherActive = true;
}
}
Expand All @@ -166,7 +161,7 @@ export class CdsLoadBalancer implements LoadBalancer {
trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster());
this.childBalancer.destroy();
if (this.isWatcherActive) {
this.xdsClient?.removeClusterWatcher(
getSingletonXdsClient().removeClusterWatcher(
this.latestConfig!.getCluster(),
this.watcher
);
Expand Down
21 changes: 8 additions & 13 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -16,7 +16,7 @@
*/

import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority';
Expand All @@ -33,6 +33,7 @@ import PickResultType = experimental.PickResultType;
import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimental';
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target';
import { LrsLoadBalancingConfig } from './load-balancer-lrs';
import { Watcher } from './xds-stream-state/xds-stream-state';

const TRACER_NAME = 'eds_balancer';

Expand Down Expand Up @@ -122,11 +123,10 @@ export class EdsLoadBalancer implements LoadBalancer {
* requests.
*/
private childBalancer: ChildLoadBalancerHandler;
private xdsClient: XdsClient | null = null;
private edsServiceName: string | null = null;
private watcher: Watcher<ClusterLoadAssignment__Output>;
/**
* Indicates whether the watcher has already been passed to this.xdsClient
* Indicates whether the watcher has already been passed to the xdsClient
* and is getting updates.
*/
private isWatcherActive = false;
Expand Down Expand Up @@ -377,7 +377,7 @@ export class EdsLoadBalancer implements LoadBalancer {
validateLoadBalancingConfig({ round_robin: {} }),
];
let childPolicy: LoadBalancingConfig[];
if (this.lastestConfig.getLrsLoadReportingServerName()) {
if (this.lastestConfig.getLrsLoadReportingServerName() !== undefined) {
childPolicy = [new LrsLoadBalancingConfig(this.lastestConfig.getCluster(), this.lastestConfig.getEdsServiceName() ?? '', this.lastestConfig.getLrsLoadReportingServerName()!, localityObj.locality, endpointPickingPolicy)];
} else {
childPolicy = endpointPickingPolicy;
Expand Down Expand Up @@ -427,21 +427,16 @@ export class EdsLoadBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
trace('Discarding address list update missing xdsClient attribute');
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestConfig = lbConfig;
this.latestAttributes = attributes;
this.xdsClient = attributes.xdsClient;
const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster();

/* If the name is changing, disable the old watcher before adding the new
* one */
if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) {
trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName)
this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher);
/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
this.isWatcherActive = false;
Expand All @@ -454,12 +449,12 @@ export class EdsLoadBalancer implements LoadBalancer {

if (!this.isWatcherActive) {
trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName);
this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = true;
}

if (lbConfig.getLrsLoadReportingServerName()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
this.clusterDropStats = getSingletonXdsClient().addClusterDropStats(
lbConfig.getLrsLoadReportingServerName()!,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
Expand All @@ -480,7 +475,7 @@ export class EdsLoadBalancer implements LoadBalancer {
destroy(): void {
trace('Destroying load balancer with edsServiceName ' + this.edsServiceName);
if (this.edsServiceName) {
this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher);
getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher);
}
this.childBalancer.destroy();
}
Expand Down
8 changes: 2 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Expand Up @@ -16,9 +16,8 @@
*/

import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js';
import { type } from 'os';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
import { XdsClusterLocalityStats, XdsClient } from './xds-client';
import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client';
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
Expand Down Expand Up @@ -208,10 +207,7 @@ export class LrsLoadBalancer implements LoadBalancer {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
return;
}
if (!(attributes.xdsClient instanceof XdsClient)) {
return;
}
this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats(
this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats(
lbConfig.getLrsLoadReportingServerName(),
lbConfig.getClusterName(),
lbConfig.getEdsServiceName(),
Expand Down
Expand Up @@ -44,7 +44,7 @@ interface ClusterManagerChild {
child_policy: LoadBalancingConfig[];
}

class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
export class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand Down