Skip to content

Commit

Permalink
Merge pull request #1724 from murgatroid99/grpc-js-xds_a28_interop_at…
Browse files Browse the repository at this point in the history
…tempt_2

Enable path_matching and header_matching xDS interop tests
  • Loading branch information
murgatroid99 committed Apr 2, 2021
2 parents bc05778 + a907086 commit 3468e58
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 124 deletions.
85 changes: 58 additions & 27 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Expand Up @@ -23,7 +23,7 @@ import { ProtoGrpcType } from './generated/test';

import * as protoLoader from '@grpc/proto-loader';
import { TestServiceClient } from './generated/grpc/testing/TestService';
import { LoadBalancerStatsResponse } from './generated/grpc/testing/LoadBalancerStatsResponse';
import { LoadBalancerStatsResponse, _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output } from './generated/grpc/testing/LoadBalancerStatsResponse';
import * as yargs from 'yargs';
import { LoadBalancerStatsServiceHandlers } from './generated/grpc/testing/LoadBalancerStatsService';
import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testing/XdsUpdateClientConfigureService';
Expand All @@ -49,13 +49,14 @@ const REQUEST_TIMEOUT_SEC = 20;
const VERBOSITY = Number.parseInt(process.env.NODE_XDS_INTEROP_VERBOSITY ?? '0');

interface CallEndNotifier {
onCallSucceeded(peerName: string): void;
onCallSucceeded(methodName: string, peerName: string): void;
onCallFailed(message: string): void;
}

class CallSubscriber {
private callsStarted = 0;
private callsSucceededByPeer: {[key: string]: number} = {};
private callsSucceededByPeer: {[peer: string]: number} = {};
private callsSucceededByMethod: {[method: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output} = {}
private callsSucceeded = 0;
private callsFinished = 0;
private failureMessageCount: Map<string, number> = new Map<string, number>();
Expand All @@ -75,9 +76,18 @@ class CallSubscriber {
}
}

addCallSucceeded(peerName: string): void {
addCallSucceeded(methodName: string, peerName: string): void {
if (VERBOSITY >= 2) {
console.log(`Call to ${peerName} succeeded`);
console.log(`Call ${methodName} to ${peerName} succeeded`);
}
if (methodName in this.callsSucceededByMethod) {
if (peerName in this.callsSucceededByMethod[methodName].rpcs_by_peer) {
this.callsSucceededByMethod[methodName].rpcs_by_peer[peerName] += 1;
} else {
this.callsSucceededByMethod[methodName].rpcs_by_peer[peerName] = 1;
}
} else {
this.callsSucceededByMethod[methodName] = {rpcs_by_peer: {[peerName]: 1}};
}
if (peerName in this.callsSucceededByPeer) {
this.callsSucceededByPeer[peerName] += 1;
Expand Down Expand Up @@ -110,7 +120,8 @@ class CallSubscriber {
}
return {
rpcs_by_peer: this.callsSucceededByPeer,
num_failures: this.callsStarted - this.callsSucceeded
num_failures: this.callsStarted - this.callsSucceeded,
rpcs_by_method: this.callsSucceededByMethod
};
}
}
Expand Down Expand Up @@ -148,9 +159,9 @@ class CallStatsTracker {
}
}
return {
onCallSucceeded: (peerName: string) => {
onCallSucceeded: (methodName: string, peerName: string) => {
for (const subscriber of callSubscribers) {
subscriber.addCallSucceeded(peerName);
subscriber.addCallSucceeded(methodName, peerName);
}
},
onCallFailed: (message: string) => {
Expand All @@ -162,22 +173,22 @@ class CallStatsTracker {
}
}

type CallType = 'EMPTY_CALL' | 'UNARY_CALL';
type CallType = 'EmptyCall' | 'UnaryCall';

interface ClientConfiguration {
callTypes: (CallType)[];
metadata: {
EMPTY_CALL: grpc.Metadata,
UNARY_CALL: grpc.Metadata
EmptyCall: grpc.Metadata,
UnaryCall: grpc.Metadata
},
timeoutSec: number
}

const currentConfig: ClientConfiguration = {
callTypes: ['EMPTY_CALL'],
callTypes: ['UnaryCall'],
metadata: {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
EmptyCall: new grpc.Metadata(),
UnaryCall: new grpc.Metadata()
},
timeoutSec: REQUEST_TIMEOUT_SEC
};
Expand All @@ -186,11 +197,11 @@ let anyCallSucceeded = false;

const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
stats_per_method: {
'EMPTY_CALL': {
EmptyCall: {
rpcs_started: 0,
result: {}
},
'UNARY_CALL': {
UnaryCall: {
rpcs_started: 0,
result: {}
}
Expand All @@ -200,7 +211,6 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callTypeStats = accumulatedStats.stats_per_method![type];
callTypeStats.rpcs_started! += 1;

const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
Expand All @@ -225,12 +235,12 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
notifier.onCallSucceeded(type, hostname);
}
}
}
};
const method = (type === 'EMPTY_CALL' ? client.emptyCall : client.unaryCall).bind(client);
const method = (type === 'EmptyCall' ? client.emptyCall : client.unaryCall).bind(client);
const call = method({}, currentConfig.metadata[type], {deadline}, callback);
call.on('metadata', (metadata) => {
hostname = (metadata.get('hostname') as string[])[0] ?? null;
Expand All @@ -239,7 +249,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
notifier.onCallSucceeded(type, hostname);
}
}
});
Expand All @@ -254,16 +264,35 @@ function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpc
}, 1000/qps);
}


const callTypeEnumMap = {
'EMPTY_CALL': 'EmptyCall' as CallType,
'UNARY_CALL': 'UnaryCall' as CallType
};

function main() {
const argv = yargs
.string(['fail_on_failed_rpcs', 'server', 'stats_port'])
.string(['fail_on_failed_rpcs', 'server', 'stats_port', 'rpc', 'metadata'])
.number(['num_channels', 'qps'])
.require(['qps', 'server', 'stats_port'])
.demandOption(['server', 'stats_port'])
.default('num_channels', 1)
.default('qps', 1)
.default('rpc', 'UnaryCall')
.default('metadata', '')
.argv;
console.log('Starting xDS interop client. Args: ', argv);
currentConfig.callTypes = argv.rpc.split(',').filter(value => value === 'EmptyCall' || value === 'UnaryCall') as CallType[];
for (const item of argv.metadata.split(',')) {
const [method, key, value] = item.split(':');
if (value === undefined) {
continue;
}
if (method !== 'EmptyCall' && method !== 'UnaryCall') {
continue;
}
currentConfig.metadata[method].add(key, value);
}
console.log('EmptyCall metadata: ' + JSON.stringify(currentConfig.metadata.EmptyCall.getMap()));
console.log('UnaryCall metadata: ' + JSON.stringify(currentConfig.metadata.UnaryCall.getMap()));
const callStatsTracker = new CallStatsTracker();
for (let i = 0; i < argv.num_channels; i++) {
/* The 'unique' channel argument is there solely to ensure that the
Expand Down Expand Up @@ -294,15 +323,17 @@ function main() {
const xdsUpdateClientConfigureServiceImpl: XdsUpdateClientConfigureServiceHandlers = {
Configure: (call, callback) => {
const callMetadata = {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
EmptyCall: new grpc.Metadata(),
UnaryCall: new grpc.Metadata()
}
for (const metadataItem of call.request.metadata) {
callMetadata[metadataItem.type].add(metadataItem.key, metadataItem.value);
callMetadata[callTypeEnumMap[metadataItem.type]].add(metadataItem.key, metadataItem.value);
}
currentConfig.callTypes = call.request.types;
currentConfig.callTypes = call.request.types.map(value => callTypeEnumMap[value]);
currentConfig.metadata = callMetadata;
currentConfig.timeoutSec = call.request.timeout_sec
console.log('Received new client configuration: ' + JSON.stringify(currentConfig, undefined, 2));
callback(null, {});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Expand Up @@ -53,7 +53,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
NODE_XDS_INTEROP_VERBOSITY=1 \
GRPC_XDS_EXPERIMENTAL_ROUTING=true \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="all" \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -191,7 +191,7 @@ export class EdsLoadBalancer implements LoadBalancer {
});
this.watcher = {
onValidUpdate: (update) => {
trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update));
trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update, undefined, 2));
this.latestEdsUpdate = update;
this.updateChild();
},
Expand Down
Expand Up @@ -32,7 +32,7 @@ import getFirstUsableConfig = experimental.getFirstUsableConfig;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;

const TRACER_NAME = 'weighted_target';
const TRACER_NAME = 'xds_cluster_manager';

function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
Expand Down Expand Up @@ -247,6 +247,7 @@ class XdsClusterManager implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const configChildren = lbConfig.getChildren();
// Delete children that are not in the new config
const namesToRemove: string[] = [];
Expand Down

0 comments on commit 3468e58

Please sign in to comment.