diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index f9e5264a4..1588f3b11 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -59,7 +59,7 @@ class XdsResolver implements Resolver { onValidUpdate: (update: ServiceConfig) => { trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update)); this.hasReportedSuccess = true; - this.listener.onSuccessfulResolution([], update, null, { + this.listener.onSuccessfulResolution([], update, null, null, { xdsClient: this.xdsClient, }); }, diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index e1a76c092..dad8a5324 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -33,7 +33,7 @@ import { FilterStackFactory } from './filter-stack'; import { CallCredentialsFilterFactory } from './call-credentials-filter'; import { DeadlineFilterFactory } from './deadline-filter'; import { CompressionFilterFactory } from './compression-filter'; -import { getDefaultAuthority, mapUriDefaultScheme } from './resolver'; +import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver'; import { trace, log } from './logging'; import { SubchannelAddress } from './subchannel'; import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; @@ -136,9 +136,18 @@ export class ChannelImplementation implements Channel { private subchannelPool: SubchannelPool; private connectivityState: ConnectivityState = ConnectivityState.IDLE; private currentPicker: Picker = new UnavailablePicker(); + /** + * Calls queued up to get a call config. Should only be populated before the + * first time the resolver returns a result, which includes the ConfigSelector. + */ + private configSelectionQueue: Array<{ + callStream: Http2CallStream; + callMetadata: Metadata; + }> = []; private pickQueue: Array<{ callStream: Http2CallStream; callMetadata: Metadata; + callConfig: CallConfig; }> = []; private connectivityStateWatchers: ConnectivityStateWatcher[] = []; private defaultAuthority: string; @@ -152,6 +161,7 @@ export class ChannelImplementation implements Channel { * is non-empty. */ private callRefTimer: NodeJS.Timer; + private configSelector: ConfigSelector | null = null; constructor( target: string, private readonly credentials: ChannelCredentials, @@ -225,10 +235,10 @@ export class ChannelImplementation implements Channel { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); - this.callRefTimer.unref?.(); this.pickQueue = []; - for (const { callStream, callMetadata } of queueCopy) { - this.tryPick(callStream, callMetadata); + this.callRefTimerUnref(); + for (const { callStream, callMetadata, callConfig } of queueCopy) { + this.tryPick(callStream, callMetadata, callConfig); } this.updateState(connectivityState); }, @@ -242,7 +252,37 @@ export class ChannelImplementation implements Channel { this.resolvingLoadBalancer = new ResolvingLoadBalancer( this.target, channelControlHelper, - options + options, + (configSelector) => { + this.configSelector = configSelector; + /* We process the queue asynchronously to ensure that the corresponding + * load balancer update has completed. */ + process.nextTick(() => { + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref() + for (const {callStream, callMetadata} of localQueue) { + this.tryGetConfig(callStream, callMetadata); + } + this.configSelectionQueue = []; + }); + }, + (status) => { + if (this.configSelectionQueue.length > 0) { + trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection'); + } + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref(); + for (const {callStream, callMetadata} of localQueue) { + if (callMetadata.getOptions().waitForReady) { + this.callRefTimerRef(); + this.configSelectionQueue.push({callStream, callMetadata}); + } else { + callStream.cancelWithStatus(status.code, status.details); + } + } + } ); this.filterStackFactory = new FilterStackFactory([ new CallCredentialsFilterFactory(this), @@ -252,9 +292,25 @@ export class ChannelImplementation implements Channel { ]); } - private pushPick(callStream: Http2CallStream, callMetadata: Metadata) { - this.callRefTimer.ref?.(); - this.pickQueue.push({ callStream, callMetadata }); + private callRefTimerRef() { + // If the hasRef function does not exist, always run the code + if (!this.callRefTimer.hasRef?.()) { + trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); + this.callRefTimer.ref?.(); + } + } + + private callRefTimerUnref() { + // If the hasRef function does not exist, always run the code + if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) { + trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); + this.callRefTimer.unref?.(); + } + } + + private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { + this.pickQueue.push({ callStream, callMetadata, callConfig }); + this.callRefTimerRef(); } /** @@ -264,8 +320,8 @@ export class ChannelImplementation implements Channel { * @param callStream * @param callMetadata */ - private tryPick(callStream: Http2CallStream, callMetadata: Metadata) { - const pickResult = this.currentPicker.pick({ metadata: callMetadata }); + private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { + const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation }); trace( LogVerbosity.DEBUG, 'channel', @@ -301,7 +357,7 @@ export class ChannelImplementation implements Channel { ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); break; } /* We need to clone the callMetadata here because the transparent @@ -321,6 +377,7 @@ export class ChannelImplementation implements Channel { ); /* If we reach this point, the call stream has started * successfully */ + callConfig.onCommitted?.(); pickResult.onCallStarted?.(); } catch (error) { if ( @@ -349,7 +406,7 @@ export class ChannelImplementation implements Channel { (error as Error).message + '. Retrying pick' ); - this.tryPick(callStream, callMetadata); + this.tryPick(callStream, callMetadata, callConfig); } else { trace( LogVerbosity.INFO, @@ -378,7 +435,7 @@ export class ChannelImplementation implements Channel { ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick' ); - this.tryPick(callStream, callMetadata); + this.tryPick(callStream, callMetadata, callConfig); } }, (error: Error & { code: number }) => { @@ -392,11 +449,11 @@ export class ChannelImplementation implements Channel { } break; case PickResultType.QUEUE: - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pushPick(callStream, callMetadata); + this.pushPick(callStream, callMetadata, callConfig); } else { callStream.cancelWithStatus( pickResult.status!.code, @@ -451,8 +508,30 @@ export class ChannelImplementation implements Channel { } } + private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { + if (this.configSelector === null) { + /* This branch will only be taken at the beginning of the channel's life, + * before the resolver ever returns a result. So, the + * ResolvingLoadBalancer may be idle and if so it needs to be kicked + * because it now has a pending request. */ + this.resolvingLoadBalancer.exitIdle(); + this.configSelectionQueue.push({ + callStream: stream, + callMetadata: metadata + }); + this.callRefTimerRef(); + } else { + const callConfig = this.configSelector(stream.getMethod(), metadata); + if (callConfig.status === Status.OK) { + this.tryPick(stream, metadata, callConfig); + } else { + stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); + } + } + } + _startCallStream(stream: Http2CallStream, metadata: Metadata) { - this.tryPick(stream, metadata.clone()); + this.tryGetConfig(stream, metadata.clone()); } close() { diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 184047b23..6df61b59a 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -85,6 +85,7 @@ export interface DropCallPickResult extends PickResult { export interface PickArgs { metadata: Metadata; + extraPickInfo: {[key: string]: string}; } /** diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index f19318d33..67f1f8c45 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -129,7 +129,7 @@ class DnsResolver implements Resolver { if (this.ipResult !== null) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { - this.listener.onSuccessfulResolution(this.ipResult!, null, null, {}); + this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {}); }); return; } @@ -192,6 +192,7 @@ class DnsResolver implements Resolver { this.latestLookupResult, this.latestServiceConfig, this.latestServiceConfigError, + null, {} ); }, @@ -237,6 +238,7 @@ class DnsResolver implements Resolver { this.latestLookupResult, this.latestServiceConfig, this.latestServiceConfigError, + null, {} ); } diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index ed25177cc..40502f113 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -40,6 +40,7 @@ class UdsResolver implements Resolver { this.addresses, null, null, + null, {} ); } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 74f3e5143..147ace30d 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -15,13 +15,30 @@ * */ -import { ServiceConfig } from './service-config'; +import { MethodConfig, ServiceConfig } from './service-config'; import * as resolver_dns from './resolver-dns'; import * as resolver_uds from './resolver-uds'; import { StatusObject } from './call-stream'; import { SubchannelAddress } from './subchannel'; import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; +import { Metadata } from './metadata'; +import { Status } from './constants'; + +export interface CallConfig { + methodConfig: MethodConfig; + onCommitted?: () => void; + pickInformation: {[key: string]: string}; + status: Status; +} + +/** + * Selects a configuration for a method given the name and metadata. Defined in + * https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc + */ +export interface ConfigSelector { + (methodName: string, metadata: Metadata): CallConfig; +} /** * A listener object passed to the resolver's constructor that provides name @@ -41,6 +58,7 @@ export interface ResolverListener { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, + configSelector: ConfigSelector | null, attributes: { [key: string]: unknown } ): void; /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 5a4c62f57..84fe4ae1d 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -23,7 +23,7 @@ import { } from './load-balancer'; import { ServiceConfig, validateServiceConfig } from './service-config'; import { ConnectivityState } from './channel'; -import { createResolver, Resolver } from './resolver'; +import { ConfigSelector, createResolver, Resolver } from './resolver'; import { ServiceError } from './call'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; import { BackoffTimeout } from './backoff-timeout'; @@ -46,6 +46,40 @@ function trace(text: string): void { const DEFAULT_LOAD_BALANCER_NAME = 'pick_first'; +function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector { + return function defaultConfigSelector(methodName: string, metadata: Metadata) { + const splitName = methodName.split('/').filter(x => x.length > 0); + const service = splitName[0] ?? ''; + const method = splitName[1] ?? ''; + if (serviceConfig && serviceConfig.methodConfig) { + for (const methodConfig of serviceConfig.methodConfig) { + for (const name of methodConfig.name) { + if (name.service === service && (name.method === undefined || name.method === method)) { + return { + methodConfig: methodConfig, + pickInformation: {}, + status: Status.OK + }; + } + } + } + } + return { + methodConfig: {name: []}, + pickInformation: {}, + status: Status.OK + }; + } +} + +export interface ResolutionCallback { + (configSelector: ConfigSelector): void; +} + +export interface ResolutionFailureCallback { + (status: StatusObject): void; +} + export class ResolvingLoadBalancer implements LoadBalancer { /** * The resolver class constructed for the target address. @@ -93,7 +127,9 @@ export class ResolvingLoadBalancer implements LoadBalancer { constructor( private readonly target: GrpcUri, private readonly channelControlHelper: ChannelControlHelper, - private readonly channelOptions: ChannelOptions + private readonly channelOptions: ChannelOptions, + private readonly onSuccessfulResolution: ResolutionCallback, + private readonly onFailedResolution: ResolutionFailureCallback ) { if (channelOptions['grpc.service_config']) { this.defaultServiceConfig = validateServiceConfig( @@ -134,6 +170,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null, + configSelector: ConfigSelector | null, attributes: { [key: string]: unknown } ) => { let workingServiceConfig: ServiceConfig | null = null; @@ -180,6 +217,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { loadBalancingConfig, attributes ); + const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig; + this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig)); }, onError: (error: StatusObject) => { this.handleResolutionFailure(error); @@ -227,6 +266,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker(error) ); + this.onFailedResolution(error); } this.backoffTimeout.runOnce(); }