Skip to content

Commit

Permalink
Merge pull request #1681 from murgatroid99/grpc-js_config_selector
Browse files Browse the repository at this point in the history
grpc-js: Add ConfigSelector to Resolver API and plumb it through the channel
  • Loading branch information
murgatroid99 committed Feb 8, 2021
2 parents b570200 + 9e084bc commit 43eb668
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 21 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/src/resolver-xds.ts
Expand Up @@ -59,7 +59,7 @@ class XdsResolver implements Resolver {
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
this.listener.onSuccessfulResolution([], update, null, null, {
xdsClient: this.xdsClient,
});
},
Expand Down
111 changes: 95 additions & 16 deletions packages/grpc-js/src/channel.ts
Expand Up @@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import { getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
Expand Down Expand Up @@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel {
private subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
* Calls queued up to get a call config. Should only be populated before the
* first time the resolver returns a result, which includes the ConfigSelector.
*/
private configSelectionQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
}> = [];
private pickQueue: Array<{
callStream: Http2CallStream;
callMetadata: Metadata;
callConfig: CallConfig;
}> = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
Expand All @@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel {
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
constructor(
target: string,
private readonly credentials: ChannelCredentials,
Expand Down Expand Up @@ -225,10 +235,10 @@ export class ChannelImplementation implements Channel {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
const queueCopy = this.pickQueue.slice();
this.callRefTimer.unref?.();
this.pickQueue = [];
for (const { callStream, callMetadata } of queueCopy) {
this.tryPick(callStream, callMetadata);
this.callRefTimerUnref();
for (const { callStream, callMetadata, callConfig } of queueCopy) {
this.tryPick(callStream, callMetadata, callConfig);
}
this.updateState(connectivityState);
},
Expand All @@ -242,7 +252,37 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options
options,
(configSelector) => {
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
process.nextTick(() => {
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref()
for (const {callStream, callMetadata} of localQueue) {
this.tryGetConfig(callStream, callMetadata);
}
this.configSelectionQueue = [];
});
},
(status) => {
if (this.configSelectionQueue.length > 0) {
trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection');
}
const localQueue = this.configSelectionQueue;
this.configSelectionQueue = [];
this.callRefTimerUnref();
for (const {callStream, callMetadata} of localQueue) {
if (callMetadata.getOptions().waitForReady) {
this.callRefTimerRef();
this.configSelectionQueue.push({callStream, callMetadata});
} else {
callStream.cancelWithStatus(status.code, status.details);
}
}
}
);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
Expand All @@ -252,9 +292,25 @@ export class ChannelImplementation implements Channel {
]);
}

private pushPick(callStream: Http2CallStream, callMetadata: Metadata) {
this.callRefTimer.ref?.();
this.pickQueue.push({ callStream, callMetadata });
private callRefTimerRef() {
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef?.()) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
this.callRefTimer.ref?.();
}
}

private callRefTimerUnref() {
// If the hasRef function does not exist, always run the code
if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) {
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length);
this.callRefTimer.unref?.();
}
}

private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
this.pickQueue.push({ callStream, callMetadata, callConfig });
this.callRefTimerRef();
}

/**
Expand All @@ -264,8 +320,8 @@ export class ChannelImplementation implements Channel {
* @param callStream
* @param callMetadata
*/
private tryPick(callStream: Http2CallStream, callMetadata: Metadata) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata });
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) {
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation });
trace(
LogVerbosity.DEBUG,
'channel',
Expand Down Expand Up @@ -301,7 +357,7 @@ export class ChannelImplementation implements Channel {
' has state ' +
ConnectivityState[pickResult.subchannel!.getConnectivityState()]
);
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
}
/* We need to clone the callMetadata here because the transparent
Expand All @@ -321,6 +377,7 @@ export class ChannelImplementation implements Channel {
);
/* If we reach this point, the call stream has started
* successfully */
callConfig.onCommitted?.();
pickResult.onCallStarted?.();
} catch (error) {
if (
Expand Down Expand Up @@ -349,7 +406,7 @@ export class ChannelImplementation implements Channel {
(error as Error).message +
'. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
} else {
trace(
LogVerbosity.INFO,
Expand Down Expand Up @@ -378,7 +435,7 @@ export class ChannelImplementation implements Channel {
ConnectivityState[subchannelState] +
' after metadata filters. Retrying pick'
);
this.tryPick(callStream, callMetadata);
this.tryPick(callStream, callMetadata, callConfig);
}
},
(error: Error & { code: number }) => {
Expand All @@ -392,11 +449,11 @@ export class ChannelImplementation implements Channel {
}
break;
case PickResultType.QUEUE:
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
break;
case PickResultType.TRANSIENT_FAILURE:
if (callMetadata.getOptions().waitForReady) {
this.pushPick(callStream, callMetadata);
this.pushPick(callStream, callMetadata, callConfig);
} else {
callStream.cancelWithStatus(
pickResult.status!.code,
Expand Down Expand Up @@ -451,8 +508,30 @@ export class ChannelImplementation implements Channel {
}
}

private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
if (this.configSelector === null) {
/* This branch will only be taken at the beginning of the channel's life,
* before the resolver ever returns a result. So, the
* ResolvingLoadBalancer may be idle and if so it needs to be kicked
* because it now has a pending request. */
this.resolvingLoadBalancer.exitIdle();
this.configSelectionQueue.push({
callStream: stream,
callMetadata: metadata
});
this.callRefTimerRef();
} else {
const callConfig = this.configSelector(stream.getMethod(), metadata);
if (callConfig.status === Status.OK) {
this.tryPick(stream, metadata, callConfig);
} else {
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod());
}
}
}

_startCallStream(stream: Http2CallStream, metadata: Metadata) {
this.tryPick(stream, metadata.clone());
this.tryGetConfig(stream, metadata.clone());
}

close() {
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/picker.ts
Expand Up @@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult {

export interface PickArgs {
metadata: Metadata;
extraPickInfo: {[key: string]: string};
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/grpc-js/src/resolver-dns.ts
Expand Up @@ -129,7 +129,7 @@ class DnsResolver implements Resolver {
if (this.ipResult !== null) {
trace('Returning IP address for target ' + uriToString(this.target));
setImmediate(() => {
this.listener.onSuccessfulResolution(this.ipResult!, null, null, {});
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {});
});
return;
}
Expand Down Expand Up @@ -192,6 +192,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
},
Expand Down Expand Up @@ -237,6 +238,7 @@ class DnsResolver implements Resolver {
this.latestLookupResult,
this.latestServiceConfig,
this.latestServiceConfigError,
null,
{}
);
}
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/resolver-uds.ts
Expand Up @@ -40,6 +40,7 @@ class UdsResolver implements Resolver {
this.addresses,
null,
null,
null,
{}
);
}
Expand Down
20 changes: 19 additions & 1 deletion packages/grpc-js/src/resolver.ts
Expand Up @@ -15,13 +15,30 @@
*
*/

import { ServiceConfig } from './service-config';
import { MethodConfig, ServiceConfig } from './service-config';
import * as resolver_dns from './resolver-dns';
import * as resolver_uds from './resolver-uds';
import { StatusObject } from './call-stream';
import { SubchannelAddress } from './subchannel';
import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';
import { Metadata } from './metadata';
import { Status } from './constants';

export interface CallConfig {
methodConfig: MethodConfig;
onCommitted?: () => void;
pickInformation: {[key: string]: string};
status: Status;
}

/**
* Selects a configuration for a method given the name and metadata. Defined in
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
}

/**
* A listener object passed to the resolver's constructor that provides name
Expand All @@ -41,6 +58,7 @@ export interface ResolverListener {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
): void;
/**
Expand Down
44 changes: 42 additions & 2 deletions packages/grpc-js/src/resolving-load-balancer.ts
Expand Up @@ -23,7 +23,7 @@ import {
} from './load-balancer';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';
import { createResolver, Resolver } from './resolver';
import { ConfigSelector, createResolver, Resolver } from './resolver';
import { ServiceError } from './call';
import { Picker, UnavailablePicker, QueuePicker } from './picker';
import { BackoffTimeout } from './backoff-timeout';
Expand All @@ -46,6 +46,40 @@ function trace(text: string): void {

const DEFAULT_LOAD_BALANCER_NAME = 'pick_first';

function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector {
return function defaultConfigSelector(methodName: string, metadata: Metadata) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
for (const methodConfig of serviceConfig.methodConfig) {
for (const name of methodConfig.name) {
if (name.service === service && (name.method === undefined || name.method === method)) {
return {
methodConfig: methodConfig,
pickInformation: {},
status: Status.OK
};
}
}
}
}
return {
methodConfig: {name: []},
pickInformation: {},
status: Status.OK
};
}
}

export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
}

export interface ResolutionFailureCallback {
(status: StatusObject): void;
}

export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
Expand Down Expand Up @@ -93,7 +127,9 @@ export class ResolvingLoadBalancer implements LoadBalancer {
constructor(
private readonly target: GrpcUri,
private readonly channelControlHelper: ChannelControlHelper,
private readonly channelOptions: ChannelOptions
private readonly channelOptions: ChannelOptions,
private readonly onSuccessfulResolution: ResolutionCallback,
private readonly onFailedResolution: ResolutionFailureCallback
) {
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = validateServiceConfig(
Expand Down Expand Up @@ -134,6 +170,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
configSelector: ConfigSelector | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
Expand Down Expand Up @@ -180,6 +217,8 @@ export class ResolvingLoadBalancer implements LoadBalancer {
loadBalancingConfig,
attributes
);
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig));
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
Expand Down Expand Up @@ -227,6 +266,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker(error)
);
this.onFailedResolution(error);
}
this.backoffTimeout.runOnce();
}
Expand Down

0 comments on commit 43eb668

Please sign in to comment.