Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Add ConfigSelector to Resolver API and plumb it through the channel #1681

Merged
merged 5 commits into from Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/resolver-xds.ts
Expand Up @@ -59,7 +59,7 @@ class XdsResolver implements Resolver {
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
this.listener.onSuccessfulResolution([], update, null, null, {
xdsClient: this.xdsClient,
});
},
Expand Down
72 changes: 58 additions & 14 deletions packages/grpc-js/src/channel.ts
Expand Up @@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
Expand Down Expand Up @@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel {
private subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
* Calls queued up to get a call config. Should only be populated before the
* first time the resolver returns a result, which includes the ConfigSelector.
*/
private configSelectionQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
}> = [];
private pickQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
callConfig: CallConfig;
}> = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
Expand All @@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel {
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
constructor(
target: string,
private readonly credentials: ChannelCredentials,
Expand Down Expand Up @@ -227,8 +237,8 @@ export class ChannelImplementation implements Channel {
const queueCopy = this.pickQueue.slice();
this.callRefTimer.unref?.();
this.pickQueue = [];
for (const { callStream, callMetadata } of queueCopy) {
this.tryPick(callStream, callMetadata);
for (const { callStream, callMetadata, callConfig } of queueCopy) {
this.tryPick(callStream, callMetadata, callConfig);
}
this.updateState(connectivityState);
},
Expand All @@ -242,7 +252,18 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options
options,
(configSelector) => {
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
process.nextTick(() => {
for (const {callStream, callMetadata} of this.configSelectionQueue) {
this.tryGetConfig(callStream, callMetadata);
}
this.configSelectionQueue = [];
});
}
);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
Expand All @@ -252,9 +273,9 @@ export class ChannelImplementation implements Channel {
]);
}

private pushPick(callStream: Http2CallStream, callMetadata: Metadata) {
private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
this.callRefTimer.ref?.();
this.pickQueue.push({ callStream, callMetadata });
this.pickQueue.push({ callStream, callMetadata, callConfig });
}

/**
Expand All @@ -264,8 +285,8 @@ export class ChannelImplementation implements Channel {
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata });
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation });
trace(
LogVerbosity.DEBUG,
'channel',
Expand Down Expand Up @@ -301,7 +322,7 @@ export class ChannelImplementation implements Channel {
' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
);
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
}
/* We need to clone the callMetadata here because the transparent
Expand All @@ -321,6 +342,7 @@ export class ChannelImplementation implements Channel {
);
/* If we reach this point, the call stream has started
* successfully */
callConfig.onCommitted?.();
pickResult.onCallStarted?.();
} catch (error) {
if (
Expand Down Expand Up @@ -349,7 +371,7 @@ export class ChannelImplementation implements Channel {
(error as Error).message +
'. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
} else {
trace(
LogVerbosity.INFO,
Expand Down Expand Up @@ -378,7 +400,7 @@ export class ChannelImplementation implements Channel {
ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
}
},
(error: Error & { code: number }) => {
Expand All @@ -392,11 +414,11 @@ export class ChannelImplementation implements Channel {
}
break;
case PickResultType.QUEUE:
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
case PickResultType.TRANSIENT_FAILURE:
if (callMetadata.getOptions().waitForReady) {
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
} else {
callStream.cancelWithStatus(
pickResult.status!.code,
Expand Down Expand Up @@ -451,8 +473,30 @@ export class ChannelImplementation implements Channel {
}
}

private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
if (this.configSelector === null) {
/* This branch will only be taken at the beginning of the channel's life,
* before the resolver ever returns a result. So, the
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
* because it now has a pending request. */
this.resolvingLoadBalancer.exitIdle();
this.callRefTimer.ref?.();
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata
});
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {
this.tryPick(stream, metadata, callConfig);
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
}
}
}

_startCallStream(stream: Http2CallStream, metadata: Metadata) {
this.tryPick(stream, metadata.clone());
this.tryGetConfig(stream, metadata.clone());
}

close() {
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/picker.ts
Expand Up @@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult {

export interface PickArgs {
metadata: Metadata;
extraPickInfo: {[key: string]: string};
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/grpc-js/src/resolver-dns.ts
Expand Up @@ -129,7 +129,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null, {});
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {});
});
return;
}
Expand Down Expand Up @@ -192,6 +192,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
},
Expand Down Expand Up @@ -237,6 +238,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
}
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/resolver-uds.ts
Expand Up @@ -40,6 +40,7 @@ class UdsResolver implements Resolver {
this.addresses,
null,
null,
null,
{}
);
}
Expand Down
20 changes: 19 additions & 1 deletion packages/grpc-js/src/resolver.ts
Expand Up @@ -15,13 +15,30 @@
*
*/

import { ServiceConfig } from './service-config';
import { MethodConfig, ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { Metadata } from './metadata';
import { Status } from './constants';

export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
pickInformation: {[key: string]: string};
status: Status;
}

/**
* Selects a configuration for a method given the name and metadata. Defined in
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
}

/**
* A listener object passed to the resolver's constructor that provides name
Expand All @@ -41,6 +58,7 @@ export interface ResolverListener {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**
Expand Down
38 changes: 36 additions & 2 deletions packages/grpc-js/src/resolving-load-balancer.ts
Expand Up @@ -23,7 +23,7 @@ import {
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffTimeout } from './backoff-timeout';
Expand All @@ -46,6 +46,36 @@ function trace(text: string): void {

const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';

function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector {
return function defaultConfigSelector(methodName: string, metadata: Metadata) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (name.service === service && (name.method === undefined || name.method === method)) {
return {
methodConfig: methodConfig,
pickInformation: {},
status: Status.OK
};
}
}
}
}
return {
methodConfig: {name: []},
pickInformation: {},
status: Status.OK
};
}
}

export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
}

export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
Expand Down Expand Up @@ -93,7 +123,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
constructor(
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
private readonly channelOptions: ChannelOptions
private readonly channelOptions: ChannelOptions,
private readonly onSuccessfulResolution: ResolutionCallback
) {
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = validateServiceConfig(
Expand Down Expand Up @@ -134,6 +165,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
Expand Down Expand Up @@ -180,6 +212,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
loadBalancingConfig,
attributes
);
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig));
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
Expand Down