Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable path_matching and header_matching xDS interop tests #1724

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 58 additions & 27 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Expand Up @@ -23,7 +23,7 @@ import { ProtoGrpcType } from './generated/test';

import * as protoLoader from '@grpc/proto-loader';
import { TestServiceClient } from './generated/grpc/testing/TestService';
import { LoadBalancerStatsResponse } from './generated/grpc/testing/LoadBalancerStatsResponse';
import { LoadBalancerStatsResponse, _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output } from './generated/grpc/testing/LoadBalancerStatsResponse';
import * as yargs from 'yargs';
import { LoadBalancerStatsServiceHandlers } from './generated/grpc/testing/LoadBalancerStatsService';
import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testing/XdsUpdateClientConfigureService';
Expand All @@ -49,13 +49,14 @@ const REQUEST_TIMEOUT_SEC = 20;
const VERBOSITY = Number.parseInt(process.env.NODE_XDS_INTEROP_VERBOSITY ?? '0');

interface CallEndNotifier {
onCallSucceeded(peerName: string): void;
onCallSucceeded(methodName: string, peerName: string): void;
onCallFailed(message: string): void;
}

class CallSubscriber {
private callsStarted = 0;
private callsSucceededByPeer: {[key: string]: number} = {};
private callsSucceededByPeer: {[peer: string]: number} = {};
private callsSucceededByMethod: {[method: string]: _grpc_testing_LoadBalancerStatsResponse_RpcsByPeer__Output} = {}
private callsSucceeded = 0;
private callsFinished = 0;
private failureMessageCount: Map<string, number> = new Map<string, number>();
Expand All @@ -75,9 +76,18 @@ class CallSubscriber {
}
}

addCallSucceeded(peerName: string): void {
addCallSucceeded(methodName: string, peerName: string): void {
if (VERBOSITY >= 2) {
console.log(`Call to ${peerName} succeeded`);
console.log(`Call ${methodName} to ${peerName} succeeded`);
}
if (methodName in this.callsSucceededByMethod) {
if (peerName in this.callsSucceededByMethod[methodName].rpcs_by_peer) {
this.callsSucceededByMethod[methodName].rpcs_by_peer[peerName] += 1;
} else {
this.callsSucceededByMethod[methodName].rpcs_by_peer[peerName] = 1;
}
} else {
this.callsSucceededByMethod[methodName] = {rpcs_by_peer: {[peerName]: 1}};
}
if (peerName in this.callsSucceededByPeer) {
this.callsSucceededByPeer[peerName] += 1;
Expand Down Expand Up @@ -110,7 +120,8 @@ class CallSubscriber {
}
return {
rpcs_by_peer: this.callsSucceededByPeer,
num_failures: this.callsStarted - this.callsSucceeded
num_failures: this.callsStarted - this.callsSucceeded,
rpcs_by_method: this.callsSucceededByMethod
};
}
}
Expand Down Expand Up @@ -148,9 +159,9 @@ class CallStatsTracker {
}
}
return {
onCallSucceeded: (peerName: string) => {
onCallSucceeded: (methodName: string, peerName: string) => {
for (const subscriber of callSubscribers) {
subscriber.addCallSucceeded(peerName);
subscriber.addCallSucceeded(methodName, peerName);
}
},
onCallFailed: (message: string) => {
Expand All @@ -162,22 +173,22 @@ class CallStatsTracker {
}
}

type CallType = 'EMPTY_CALL' | 'UNARY_CALL';
type CallType = 'EmptyCall' | 'UnaryCall';

interface ClientConfiguration {
callTypes: (CallType)[];
metadata: {
EMPTY_CALL: grpc.Metadata,
UNARY_CALL: grpc.Metadata
EmptyCall: grpc.Metadata,
UnaryCall: grpc.Metadata
},
timeoutSec: number
}

const currentConfig: ClientConfiguration = {
callTypes: ['EMPTY_CALL'],
callTypes: ['UnaryCall'],
metadata: {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
EmptyCall: new grpc.Metadata(),
UnaryCall: new grpc.Metadata()
},
timeoutSec: REQUEST_TIMEOUT_SEC
};
Expand All @@ -186,11 +197,11 @@ let anyCallSucceeded = false;

const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
stats_per_method: {
'EMPTY_CALL': {
EmptyCall: {
rpcs_started: 0,
result: {}
},
'UNARY_CALL': {
UnaryCall: {
rpcs_started: 0,
result: {}
}
Expand All @@ -200,7 +211,6 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callTypeStats = accumulatedStats.stats_per_method![type];
callTypeStats.rpcs_started! += 1;

const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
Expand All @@ -225,12 +235,12 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
notifier.onCallSucceeded(type, hostname);
}
}
}
};
const method = (type === 'EMPTY_CALL' ? client.emptyCall : client.unaryCall).bind(client);
const method = (type === 'EmptyCall' ? client.emptyCall : client.unaryCall).bind(client);
const call = method({}, currentConfig.metadata[type], {deadline}, callback);
call.on('metadata', (metadata) => {
hostname = (metadata.get('hostname') as string[])[0] ?? null;
Expand All @@ -239,7 +249,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
if (hostname === null) {
notifier.onCallFailed('Hostname omitted from call metadata');
} else {
notifier.onCallSucceeded(hostname);
notifier.onCallSucceeded(type, hostname);
}
}
});
Expand All @@ -254,16 +264,35 @@ function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpc
}, 1000/qps);
}


const callTypeEnumMap = {
'EMPTY_CALL': 'EmptyCall' as CallType,
'UNARY_CALL': 'UnaryCall' as CallType
};

function main() {
const argv = yargs
.string(['fail_on_failed_rpcs', 'server', 'stats_port'])
.string(['fail_on_failed_rpcs', 'server', 'stats_port', 'rpc', 'metadata'])
.number(['num_channels', 'qps'])
.require(['qps', 'server', 'stats_port'])
.demandOption(['server', 'stats_port'])
.default('num_channels', 1)
.default('qps', 1)
.default('rpc', 'UnaryCall')
.default('metadata', '')
.argv;
console.log('Starting xDS interop client. Args: ', argv);
currentConfig.callTypes = argv.rpc.split(',').filter(value => value === 'EmptyCall' || value === 'UnaryCall') as CallType[];
for (const item of argv.metadata.split(',')) {
const [method, key, value] = item.split(':');
if (value === undefined) {
continue;
}
if (method !== 'EmptyCall' && method !== 'UnaryCall') {
continue;
}
currentConfig.metadata[method].add(key, value);
}
console.log('EmptyCall metadata: ' + JSON.stringify(currentConfig.metadata.EmptyCall.getMap()));
console.log('UnaryCall metadata: ' + JSON.stringify(currentConfig.metadata.UnaryCall.getMap()));
const callStatsTracker = new CallStatsTracker();
for (let i = 0; i < argv.num_channels; i++) {
/* The 'unique' channel argument is there solely to ensure that the
Expand Down Expand Up @@ -294,15 +323,17 @@ function main() {
const xdsUpdateClientConfigureServiceImpl: XdsUpdateClientConfigureServiceHandlers = {
Configure: (call, callback) => {
const callMetadata = {
EMPTY_CALL: new grpc.Metadata(),
UNARY_CALL: new grpc.Metadata()
EmptyCall: new grpc.Metadata(),
UnaryCall: new grpc.Metadata()
}
for (const metadataItem of call.request.metadata) {
callMetadata[metadataItem.type].add(metadataItem.key, metadataItem.value);
callMetadata[callTypeEnumMap[metadataItem.type]].add(metadataItem.key, metadataItem.value);
}
currentConfig.callTypes = call.request.types;
currentConfig.callTypes = call.request.types.map(value => callTypeEnumMap[value]);
currentConfig.metadata = callMetadata;
currentConfig.timeoutSec = call.request.timeout_sec
console.log('Received new client configuration: ' + JSON.stringify(currentConfig, undefined, 2));
callback(null, {});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Expand Up @@ -53,7 +53,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
NODE_XDS_INTEROP_VERBOSITY=1 \
GRPC_XDS_EXPERIMENTAL_ROUTING=true \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="all" \
--test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-2 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/load-balancer-eds.ts
Expand Up @@ -191,7 +191,7 @@ export class EdsLoadBalancer implements LoadBalancer {
});
this.watcher = {
onValidUpdate: (update) => {
trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update));
trace('Received EDS update for ' + this.edsServiceName + ': ' + JSON.stringify(update, undefined, 2));
this.latestEdsUpdate = update;
this.updateChild();
},
Expand Down
Expand Up @@ -32,7 +32,7 @@ import getFirstUsableConfig = experimental.getFirstUsableConfig;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;

const TRACER_NAME = 'weighted_target';
const TRACER_NAME = 'xds_cluster_manager';

function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
Expand Down Expand Up @@ -247,6 +247,7 @@ class XdsClusterManager implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const configChildren = lbConfig.getChildren();
// Delete children that are not in the new config
const namesToRemove: string[] = [];
Expand Down