diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index a26e03ea3..db021fe35 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -43,7 +43,8 @@ }, "dependencies": { "@grpc/proto-loader": "^0.6.0-pre14", - "google-auth-library": "^7.0.2" + "google-auth-library": "^7.0.2", + "re2-wasm": "^1.0.1" }, "peerDependencies": { "@grpc/grpc-js": "~1.2.2" diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index bbfc30562..d76eaf4f3 100644 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -51,8 +51,9 @@ grpc/tools/run_tests/helper_scripts/prep_xds.sh GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver \ GRPC_NODE_VERBOSITY=DEBUG \ NODE_XDS_INTEROP_VERBOSITY=1 \ + GRPC_XDS_EXPERIMENTAL_ROUTING=true \ python3 grpc/tools/run_tests/run_xds_tests.py \ - --test_case="backends_restart,change_backend_service,gentle_failover,ping_pong,remove_instance_group,round_robin,secondary_locality_gets_no_requests_on_partial_primary_failure,secondary_locality_gets_requests_on_primary_failure" \ + --test_case="all" \ --project_id=grpc-testing \ --source_image=projects/grpc-testing/global/images/xds-test-server-2 \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ diff --git a/packages/grpc-js-xds/src/environment.ts b/packages/grpc-js-xds/src/environment.ts new file mode 100644 index 000000000..56d233ddf --- /dev/null +++ b/packages/grpc-js-xds/src/environment.ts @@ -0,0 +1,22 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Environment variable protection for traffic splitting and routing + * https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#xds-resolver-and-xds-client + */ +export const GRPC_XDS_EXPERIMENTAL_ROUTING = (process.env.GRPC_XDS_EXPERIMENTAL_ROUTING === 'true'); \ No newline at end of file diff --git a/packages/grpc-js-xds/src/load-balancer-cds.ts b/packages/grpc-js-xds/src/load-balancer-cds.ts index a2961927c..d0fe2338a 100644 --- a/packages/grpc-js-xds/src/load-balancer-cds.ts +++ b/packages/grpc-js-xds/src/load-balancer-cds.ts @@ -16,7 +16,7 @@ */ import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js'; -import { XdsClient, Watcher } from './xds-client'; +import { getSingletonXdsClient, XdsClient } from './xds-client'; import { Cluster__Output } from './generated/envoy/api/v2/Cluster'; import SubchannelAddress = experimental.SubchannelAddress; import UnavailablePicker = experimental.UnavailablePicker; @@ -26,6 +26,7 @@ import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; import LoadBalancingConfig = experimental.LoadBalancingConfig; import { EdsLoadBalancingConfig } from './load-balancer-eds'; +import { Watcher } from './xds-stream-state/xds-stream-state'; const TRACER_NAME = 'cds_balancer'; @@ -65,7 +66,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig { export class CdsLoadBalancer implements LoadBalancer { private childBalancer: ChildLoadBalancerHandler; - private xdsClient: XdsClient | null = null; private watcher: Watcher; private isWatcherActive = false; @@ -121,12 +121,7 @@ export class CdsLoadBalancer implements LoadBalancer { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2)); return; } - if (!(attributes.xdsClient instanceof XdsClient)) { - trace('Discarding address list update missing xdsClient attribute'); - return; - } trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2)); - this.xdsClient = attributes.xdsClient; this.latestAttributes = attributes; /* If the cluster is changing, disable the old watcher before adding the new @@ -136,7 +131,7 @@ export class CdsLoadBalancer implements LoadBalancer { this.latestConfig?.getCluster() !== lbConfig.getCluster() ) { trace('Removing old cluster watcher for cluster name ' + this.latestConfig!.getCluster()); - this.xdsClient.removeClusterWatcher( + getSingletonXdsClient().removeClusterWatcher( this.latestConfig!.getCluster(), this.watcher ); @@ -152,7 +147,7 @@ export class CdsLoadBalancer implements LoadBalancer { if (!this.isWatcherActive) { trace('Adding new cluster watcher for cluster name ' + lbConfig.getCluster()); - this.xdsClient.addClusterWatcher(lbConfig.getCluster(), this.watcher); + getSingletonXdsClient().addClusterWatcher(lbConfig.getCluster(), this.watcher); this.isWatcherActive = true; } } @@ -166,7 +161,7 @@ export class CdsLoadBalancer implements LoadBalancer { trace('Destroying load balancer with cluster name ' + this.latestConfig?.getCluster()); this.childBalancer.destroy(); if (this.isWatcherActive) { - this.xdsClient?.removeClusterWatcher( + getSingletonXdsClient().removeClusterWatcher( this.latestConfig!.getCluster(), this.watcher ); diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index 8919f3174..01e40f5ac 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -16,7 +16,7 @@ */ import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js'; -import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client'; +import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client'; import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment'; import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; import { LocalitySubchannelAddress, PriorityChild, PriorityLoadBalancingConfig } from './load-balancer-priority'; @@ -33,6 +33,7 @@ import PickResultType = experimental.PickResultType; import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimental'; import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target'; import { LrsLoadBalancingConfig } from './load-balancer-lrs'; +import { Watcher } from './xds-stream-state/xds-stream-state'; const TRACER_NAME = 'eds_balancer'; @@ -122,11 +123,10 @@ export class EdsLoadBalancer implements LoadBalancer { * requests. */ private childBalancer: ChildLoadBalancerHandler; - private xdsClient: XdsClient | null = null; private edsServiceName: string | null = null; private watcher: Watcher; /** - * Indicates whether the watcher has already been passed to this.xdsClient + * Indicates whether the watcher has already been passed to the xdsClient * and is getting updates. */ private isWatcherActive = false; @@ -377,7 +377,7 @@ export class EdsLoadBalancer implements LoadBalancer { validateLoadBalancingConfig({ round_robin: {} }), ]; let childPolicy: LoadBalancingConfig[]; - if (this.lastestConfig.getLrsLoadReportingServerName()) { + if (this.lastestConfig.getLrsLoadReportingServerName() !== undefined) { childPolicy = [new LrsLoadBalancingConfig(this.lastestConfig.getCluster(), this.lastestConfig.getEdsServiceName() ?? '', this.lastestConfig.getLrsLoadReportingServerName()!, localityObj.locality, endpointPickingPolicy)]; } else { childPolicy = endpointPickingPolicy; @@ -427,21 +427,16 @@ export class EdsLoadBalancer implements LoadBalancer { trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); return; } - if (!(attributes.xdsClient instanceof XdsClient)) { - trace('Discarding address list update missing xdsClient attribute'); - return; - } trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2)); this.lastestConfig = lbConfig; this.latestAttributes = attributes; - this.xdsClient = attributes.xdsClient; const newEdsServiceName = lbConfig.getEdsServiceName() ?? lbConfig.getCluster(); /* If the name is changing, disable the old watcher before adding the new * one */ if (this.isWatcherActive && this.edsServiceName !== newEdsServiceName) { trace('Removing old endpoint watcher for edsServiceName ' + this.edsServiceName) - this.xdsClient.removeEndpointWatcher(this.edsServiceName!, this.watcher); + getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName!, this.watcher); /* Setting isWatcherActive to false here lets us have one code path for * calling addEndpointWatcher */ this.isWatcherActive = false; @@ -454,12 +449,12 @@ export class EdsLoadBalancer implements LoadBalancer { if (!this.isWatcherActive) { trace('Adding new endpoint watcher for edsServiceName ' + this.edsServiceName); - this.xdsClient.addEndpointWatcher(this.edsServiceName, this.watcher); + getSingletonXdsClient().addEndpointWatcher(this.edsServiceName, this.watcher); this.isWatcherActive = true; } if (lbConfig.getLrsLoadReportingServerName()) { - this.clusterDropStats = this.xdsClient.addClusterDropStats( + this.clusterDropStats = getSingletonXdsClient().addClusterDropStats( lbConfig.getLrsLoadReportingServerName()!, lbConfig.getCluster(), lbConfig.getEdsServiceName() ?? '' @@ -480,7 +475,7 @@ export class EdsLoadBalancer implements LoadBalancer { destroy(): void { trace('Destroying load balancer with edsServiceName ' + this.edsServiceName); if (this.edsServiceName) { - this.xdsClient?.removeEndpointWatcher(this.edsServiceName, this.watcher); + getSingletonXdsClient().removeEndpointWatcher(this.edsServiceName, this.watcher); } this.childBalancer.destroy(); } diff --git a/packages/grpc-js-xds/src/load-balancer-lrs.ts b/packages/grpc-js-xds/src/load-balancer-lrs.ts index b6fa68091..0792b11c2 100644 --- a/packages/grpc-js-xds/src/load-balancer-lrs.ts +++ b/packages/grpc-js-xds/src/load-balancer-lrs.ts @@ -16,9 +16,8 @@ */ import { connectivityState as ConnectivityState, StatusObject, status as Status, experimental } from '@grpc/grpc-js'; -import { type } from 'os'; import { Locality__Output } from './generated/envoy/api/v2/core/Locality'; -import { XdsClusterLocalityStats, XdsClient } from './xds-client'; +import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds-client'; import LoadBalancer = experimental.LoadBalancer; import ChannelControlHelper = experimental.ChannelControlHelper; import registerLoadBalancerType = experimental.registerLoadBalancerType; @@ -208,10 +207,7 @@ export class LrsLoadBalancer implements LoadBalancer { if (!(lbConfig instanceof LrsLoadBalancingConfig)) { return; } - if (!(attributes.xdsClient instanceof XdsClient)) { - return; - } - this.localityStatsReporter = attributes.xdsClient.addClusterLocalityStats( + this.localityStatsReporter = getSingletonXdsClient().addClusterLocalityStats( lbConfig.getLrsLoadReportingServerName(), lbConfig.getClusterName(), lbConfig.getEdsServiceName(), diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index f8d2de258..6faa0505b 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -44,7 +44,7 @@ interface ClusterManagerChild { child_policy: LoadBalancingConfig[]; } -class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig { +export class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig { getLoadBalancerName(): string { return TYPE_NAME; } diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 1588f3b11..99bbd58ce 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -14,7 +14,11 @@ * limitations under the License. */ -import { XdsClient } from './xds-client'; +import * as protoLoader from '@grpc/proto-loader'; + +import { RE2 } from 're2-wasm'; + +import { getSingletonXdsClient, XdsClient } from './xds-client'; import { StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions } from '@grpc/grpc-js'; import Resolver = experimental.Resolver; import GrpcUri = experimental.GrpcUri; @@ -22,6 +26,18 @@ import ResolverListener = experimental.ResolverListener; import uriToString = experimental.uriToString; import ServiceConfig = experimental.ServiceConfig; import registerResolver = experimental.registerResolver; +import { Listener__Output } from './generated/envoy/api/v2/Listener'; +import { Watcher } from './xds-stream-state/xds-stream-state'; +import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration'; +import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; +import { GRPC_XDS_EXPERIMENTAL_ROUTING } from './environment'; +import { CdsLoadBalancingConfig } from './load-balancer-cds'; +import { VirtualHost__Output } from './generated/envoy/api/v2/route/VirtualHost'; +import { RouteMatch__Output } from './generated/envoy/api/v2/route/RouteMatch'; +import { HeaderMatcher__Output } from './generated/envoy/api/v2/route/HeaderMatcher'; +import ConfigSelector = experimental.ConfigSelector; +import LoadBalancingConfig = experimental.LoadBalancingConfig; +import { XdsClusterManagerLoadBalancingConfig } from './load-balancer-xds-cluster-manager'; const TRACER_NAME = 'xds_resolver'; @@ -29,15 +45,412 @@ function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } +// Better match type has smaller value. +enum MatchType { + EXACT_MATCH, + SUFFIX_MATCH, + PREFIX_MATCH, + UNIVERSE_MATCH, + INVALID_MATCH, +}; + +function domainPatternMatchType(domainPattern: string): MatchType { + if (domainPattern.length === 0) { + return MatchType.INVALID_MATCH; + } + if (domainPattern.indexOf('*') < 0) { + return MatchType.EXACT_MATCH; + } + if (domainPattern === '*') { + return MatchType.UNIVERSE_MATCH; + } + if (domainPattern.startsWith('*')) { + return MatchType.SUFFIX_MATCH; + } + if (domainPattern.endsWith('*')) { + return MatchType.PREFIX_MATCH; + } + return MatchType.INVALID_MATCH; +} + +function domainMatch(matchType: MatchType, domainPattern: string, expectedHostName: string) { + switch (matchType) { + case MatchType.EXACT_MATCH: + return expectedHostName === domainPattern; + case MatchType.SUFFIX_MATCH: + return expectedHostName.endsWith(domainPattern.substring(1)); + case MatchType.PREFIX_MATCH: + return expectedHostName.startsWith(domainPattern.substring(0, domainPattern.length - 1)); + case MatchType.UNIVERSE_MATCH: + return true; + case MatchType.INVALID_MATCH: + return false; + } +} + +function findVirtualHostForDomain(virutalHostList: VirtualHost__Output[], domain: string): VirtualHost__Output | null { + let targetVhost: VirtualHost__Output | null = null; + let bestMatchType: MatchType = MatchType.INVALID_MATCH; + let longestMatch = 0; + for (const virtualHost of virutalHostList) { + for (const domainPattern of virtualHost.domains) { + const matchType = domainPatternMatchType(domainPattern); + // If we already have a match of a better type, skip this one + if (matchType > bestMatchType) { + continue; + } + // If we already have a longer match of the same type, skip this one + if (matchType === bestMatchType && domainPattern.length <= longestMatch) { + continue; + } + if (domainMatch(matchType, domainPattern, domain)) { + targetVhost = virtualHost; + bestMatchType = matchType; + longestMatch = domainPattern.length; + } + if (bestMatchType === MatchType.EXACT_MATCH) { + break; + } + } + if (bestMatchType === MatchType.EXACT_MATCH) { + break; + } + } + return targetVhost; +} + +interface Matcher { + (methodName: string, metadata: Metadata): boolean; +} + +const numberRegex = new RE2(/^-?\d+$/u); + +function getPredicateForHeaderMatcher(headerMatch: HeaderMatcher__Output): Matcher { + let valueChecker: (value: string) => boolean; + switch (headerMatch.header_match_specifier) { + case 'exact_match': + valueChecker = value => value === headerMatch.exact_match; + break; + case 'safe_regex_match': + const regex = new RE2(`^${headerMatch.safe_regex_match}$`, 'u'); + valueChecker = value => regex.test(value); + break; + case 'range_match': + const start = BigInt(headerMatch.range_match!.start); + const end = BigInt(headerMatch.range_match!.end); + valueChecker = value => { + if (!numberRegex.test(value)) { + return false; + } + const numberValue = BigInt(value); + return start <= numberValue && numberValue < end; + } + break; + case 'present_match': + valueChecker = value => true; + break; + case 'prefix_match': + valueChecker = value => value.startsWith(headerMatch.prefix_match!); + break; + case 'suffix_match': + valueChecker = value => value.endsWith(headerMatch.suffix_match!); + break; + default: + // Should be prevented by validation rules + return (methodName, metadata) => false; + } + const headerMatcher: Matcher = (methodName, metadata) => { + if (headerMatch.name.endsWith('-bin')) { + return false; + } + let value: string; + if (headerMatch.name === 'content-type') { + value = 'application/grpc'; + } else { + const valueArray = metadata.get(headerMatch.name); + if (valueArray.length === 0) { + return false; + } else { + value = valueArray.join(','); + } + } + return valueChecker(value); + } + if (headerMatch.invert_match) { + return (methodName, metadata) => !headerMatcher(methodName, metadata); + } else { + return headerMatcher; + } +} + +const RUNTIME_FRACTION_DENOMINATOR_VALUES = { + HUNDRED: 100, + TEN_THOUSAND: 10_000, + MILLION: 1_000_000 +} + +function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher { + let pathMatcher: Matcher; + switch (routeMatch.path_specifier) { + case 'prefix': + if (routeMatch.case_sensitive?.value === false) { + const prefix = routeMatch.prefix!.toLowerCase(); + pathMatcher = (methodName, metadata) => (methodName.toLowerCase().startsWith(prefix)); + } else { + const prefix = routeMatch.prefix!; + pathMatcher = (methodName, metadata) => (methodName.startsWith(prefix)); + } + break; + case 'path': + if (routeMatch.case_sensitive?.value === false) { + const path = routeMatch.path!.toLowerCase(); + pathMatcher = (methodName, metadata) => (methodName.toLowerCase() === path); + } else { + const path = routeMatch.path!; + pathMatcher = (methodName, metadata) => (methodName === path); + } + break; + case 'safe_regex': + const flags = routeMatch.case_sensitive?.value === false ? 'ui' : 'u'; + const regex = new RE2(`^${routeMatch.safe_regex!.regex!}$`, flags); + pathMatcher = (methodName, metadata) => (regex.test(methodName)); + break; + default: + // Should be prevented by validation rules + return (methodName, metadata) => false; + } + const headerMatchers: Matcher[] = routeMatch.headers.map(getPredicateForHeaderMatcher); + let runtimeFractionHandler: () => boolean; + if (!routeMatch.runtime_fraction?.default_value) { + runtimeFractionHandler = () => true; + } else { + const numerator = routeMatch.runtime_fraction.default_value.numerator; + const denominator = RUNTIME_FRACTION_DENOMINATOR_VALUES[routeMatch.runtime_fraction.default_value.denominator]; + runtimeFractionHandler = () => { + const randomNumber = Math.random() * denominator; + return randomNumber < numerator; + } + } + return (methodName, metadata) => pathMatcher(methodName, metadata) && headerMatchers.every(matcher => matcher(methodName, metadata)) && runtimeFractionHandler(); +} + class XdsResolver implements Resolver { private hasReportedSuccess = false; - private xdsClient: XdsClient | null = null; + + private ldsWatcher: Watcher; + private rdsWatcher: Watcher + private isLdsWatcherActive = false; + /** + * The latest route config name from an LDS response. The RDS watcher is + * actively watching that name if and only if this is not null. + */ + private latestRouteConfigName: string | null = null; + + private latestRouteConfig: RouteConfiguration__Output | null = null; + + private clusterRefcounts = new Map(); constructor( private target: GrpcUri, private listener: ResolverListener, private channelOptions: ChannelOptions - ) {} + ) { + this.ldsWatcher = { + onValidUpdate: (update: Listener__Output) => { + const httpConnectionManager = update.api_listener! + .api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': { + const routeConfigName = httpConnectionManager.rds!.route_config_name; + if (this.latestRouteConfigName !== routeConfigName) { + if (this.latestRouteConfigName !== null) { + getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + } + getSingletonXdsClient().addRouteWatcher(httpConnectionManager.rds!.route_config_name, this.rdsWatcher); + this.latestRouteConfigName = routeConfigName; + } + break; + } + case 'route_config': + if (this.latestRouteConfigName) { + getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + } + this.handleRouteConfig(httpConnectionManager.route_config!); + break; + default: + // This is prevented by the validation rules + } + }, + onTransientError: (error: StatusObject) => { + /* A transient error only needs to bubble up as a failure if we have + * not already provided a ServiceConfig for the upper layer to use */ + if (!this.hasReportedSuccess) { + trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); + this.reportResolutionError(error.details); + } + }, + onResourceDoesNotExist: () => { + trace('Resolution error for target ' + uriToString(this.target) + ': LDS resource does not exist'); + this.reportResolutionError(`Listener ${this.target} does not exist`); + } + }; + this.rdsWatcher = { + onValidUpdate: (update: RouteConfiguration__Output) => { + this.handleRouteConfig(update); + }, + onTransientError: (error: StatusObject) => { + /* A transient error only needs to bubble up as a failure if we have + * not already provided a ServiceConfig for the upper layer to use */ + if (!this.hasReportedSuccess) { + trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); + this.reportResolutionError(error.details); + } + }, + onResourceDoesNotExist: () => { + trace('Resolution error for target ' + uriToString(this.target) + ' and route config ' + this.latestRouteConfigName + ': RDS resource does not exist'); + this.reportResolutionError(`Route config ${this.latestRouteConfigName} does not exist`); + } + } + } + + private refCluster(clusterName: string) { + const refCount = this.clusterRefcounts.get(clusterName); + if (refCount) { + refCount.refCount += 1; + } + } + + private unrefCluster(clusterName: string) { + const refCount = this.clusterRefcounts.get(clusterName); + if (refCount) { + refCount.refCount -= 1; + if (!refCount.inLastConfig && refCount.refCount === 0) { + this.clusterRefcounts.delete(clusterName); + this.handleRouteConfig(this.latestRouteConfig!); + } + } + } + + private handleRouteConfig(routeConfig: RouteConfiguration__Output) { + this.latestRouteConfig = routeConfig; + if (GRPC_XDS_EXPERIMENTAL_ROUTING) { + const virtualHost = findVirtualHostForDomain(routeConfig.virtual_hosts, this.target.path); + if (virtualHost === null) { + this.reportResolutionError('No matching route found'); + return; + } + const allConfigClusters = new Set(); + const matchList: {matcher: Matcher, action: () => string}[] = []; + for (const route of virtualHost.routes) { + let routeAction: () => string; + switch (route.route!.cluster_specifier) { + case 'cluster_header': + continue; + case 'cluster':{ + const cluster = route.route!.cluster!; + allConfigClusters.add(cluster); + routeAction = () => cluster; + break; + } + case 'weighted_clusters': { + let lastNumerator = 0; + // clusterChoices is essentially the weighted choices represented as a CDF + const clusterChoices: {cluster: string, numerator: number}[] = []; + for (const clusterWeight of route.route!.weighted_clusters!.clusters) { + allConfigClusters.add(clusterWeight.name); + lastNumerator = lastNumerator + (clusterWeight.weight?.value ?? 0); + clusterChoices.push({cluster: clusterWeight.name, numerator: lastNumerator}); + } + routeAction = () => { + const randomNumber = Math.random() * (route.route!.weighted_clusters!.total_weight?.value ?? 100); + for (const choice of clusterChoices) { + if (randomNumber < choice.numerator) { + return choice.cluster; + } + } + // This should be prevented by the validation rules + return ''; + } + } + } + const routeMatcher = getPredicateForMatcher(route.match!); + matchList.push({matcher: routeMatcher, action: routeAction}); + } + /* Mark clusters that are not in this route config, and remove ones with + * no references */ + for (const [name, refCount] of Array.from(this.clusterRefcounts.entries())) { + if (!allConfigClusters.has(name)) { + refCount.inLastConfig = false; + if (refCount.refCount === 0) { + this.clusterRefcounts.delete(name); + } + } + } + // Add any new clusters from this route config + for (const name of allConfigClusters) { + if (this.clusterRefcounts.has(name)) { + this.clusterRefcounts.get(name)!.inLastConfig = true; + } else { + this.clusterRefcounts.set(name, {inLastConfig: true, refCount: 0}); + } + } + const configSelector: ConfigSelector = (methodName, metadata) => { + for (const {matcher, action} of matchList) { + if (matcher(methodName, metadata)) { + const clusterName = action(); + this.refCluster(clusterName); + const onCommitted = () => { + this.unrefCluster(clusterName); + } + return { + methodConfig: {name: []}, + onCommitted: onCommitted, + pickInformation: {cluster: clusterName}, + status: status.OK + }; + } + } + return { + methodConfig: {name: []}, + // cluster won't be used here, but it's set because of some TypeScript weirdness + pickInformation: {cluster: ''}, + status: status.UNAVAILABLE + }; + }; + const clusterConfigMap = new Map(); + for (const clusterName of this.clusterRefcounts.keys()) { + clusterConfigMap.set(clusterName, {child_policy: [new CdsLoadBalancingConfig(clusterName)]}); + } + const lbPolicyConfig = new XdsClusterManagerLoadBalancingConfig(clusterConfigMap); + const serviceConfig: ServiceConfig = { + methodConfig: [], + loadBalancingConfig: [lbPolicyConfig] + } + this.listener.onSuccessfulResolution([], serviceConfig, null, configSelector, {}); + } else { + // !GRPC_XDS_EXPERIMENTAL_ROUTING + for (const virtualHost of routeConfig.virtual_hosts) { + if (virtualHost.domains.indexOf(this.target.path) >= 0) { + const route = virtualHost.routes[virtualHost.routes.length - 1]; + if (route.match?.prefix === '' && route.route?.cluster) { + trace('Reporting RDS update for host ' + uriToString(this.target) + ' with cluster ' + route.route.cluster); + this.listener.onSuccessfulResolution([], { + methodConfig: [], + loadBalancingConfig: [ + new CdsLoadBalancingConfig(route.route.cluster) + ], + }, null, null, {}); + this.hasReportedSuccess = true; + return; + } else { + trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster); + } + } + } + this.reportResolutionError('No matching route found'); + } + } private reportResolutionError(reason: string) { this.listener.onError({ @@ -51,38 +464,18 @@ class XdsResolver implements Resolver { updateResolution(): void { // Wait until updateResolution is called once to start the xDS requests - if (this.xdsClient === null) { + if (!this.isLdsWatcherActive) { trace('Starting resolution for target ' + uriToString(this.target)); - this.xdsClient = new XdsClient( - this.target.path, - { - onValidUpdate: (update: ServiceConfig) => { - trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update)); - this.hasReportedSuccess = true; - this.listener.onSuccessfulResolution([], update, null, null, { - xdsClient: this.xdsClient, - }); - }, - onTransientError: (error: StatusObject) => { - /* A transient error only needs to bubble up as a failure if we have - * not already provided a ServiceConfig for the upper layer to use */ - if (!this.hasReportedSuccess) { - trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details); - this.reportResolutionError(error.details); - } - }, - onResourceDoesNotExist: () => { - trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist'); - this.reportResolutionError("Resource does not exist"); - }, - }, - this.channelOptions - ); + getSingletonXdsClient().addListenerWatcher(this.target.path, this.ldsWatcher); + this.isLdsWatcherActive = true; } } destroy() { - this.xdsClient?.shutdown(); + getSingletonXdsClient().removeListenerWatcher(this.target.path, this.ldsWatcher); + if (this.latestRouteConfigName) { + getSingletonXdsClient().removeRouteWatcher(this.latestRouteConfigName, this.rdsWatcher); + } } static getDefaultAuthority(target: GrpcUri) { diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index bedafda9a..8ecd5bb75 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -50,6 +50,11 @@ import BackoffTimeout = experimental.BackoffTimeout; import ServiceConfig = experimental.ServiceConfig; import { createGoogleDefaultCredentials } from './google-default-credentials'; import { CdsLoadBalancingConfig } from './load-balancer-cds'; +import { EdsState } from './xds-stream-state/eds-state'; +import { CdsState } from './xds-stream-state/cds-state'; +import { RdsState } from './xds-stream-state/rds-state'; +import { LdsState } from './xds-stream-state/lds-state'; +import { Watcher } from './xds-stream-state/xds-stream-state'; const TRACER_NAME = 'xds_client'; @@ -131,12 +136,6 @@ function localityEqual( ); } -export interface Watcher { - onValidUpdate(update: UpdateType): void; - onTransientError(error: StatusObject): void; - onResourceDoesNotExist(): void; -} - export interface XdsClusterDropStats { addCallDropped(category: string): void; } @@ -219,450 +218,6 @@ class ClusterLoadReportMap { } } -interface XdsStreamState { - versionInfo: string; - nonce: string; - getResourceNames(): string[]; - /** - * Returns a string containing the error details if the message should be nacked, - * or null if it should be acked. - * @param responses - */ - handleResponses(responses: ResponseType[]): string | null; - - reportStreamError(status: StatusObject): void; -} - -class EdsState implements XdsStreamState { - public versionInfo = ''; - public nonce = ''; - - private watchers: Map< - string, - Watcher[] - > = new Map[]>(); - - private latestResponses: ClusterLoadAssignment__Output[] = []; - - constructor(private updateResourceNames: () => void) {} - - /** - * Add the watcher to the watcher list. Returns true if the list of resource - * names has changed, and false otherwise. - * @param edsServiceName - * @param watcher - */ - addWatcher( - edsServiceName: string, - watcher: Watcher - ): void { - let watchersEntry = this.watchers.get(edsServiceName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(edsServiceName, watchersEntry); - } - trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName); - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - for (const message of this.latestResponses) { - if (message.cluster_name === edsServiceName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); - watcher.onValidUpdate(message); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } - } - - removeWatcher( - edsServiceName: string, - watcher: Watcher - ): void { - trace('Removing EDS watcher for edsServiceName ' + edsServiceName); - const watchersEntry = this.watchers.get(edsServiceName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName); - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(edsServiceName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } - } - - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); - } - - /** - * Validate the ClusterLoadAssignment object by these rules: - * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto - * @param message - */ - private validateResponse(message: ClusterLoadAssignment__Output) { - for (const endpoint of message.endpoints) { - for (const lb of endpoint.lb_endpoints) { - const socketAddress = lb.endpoint?.address?.socket_address; - if (!socketAddress) { - return false; - } - if (socketAddress.port_specifier !== 'port_value') { - return false; - } - if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { - return false; - } - } - } - return true; - } - - /** - * Given a list of edsServiceNames (which may actually be the cluster name), - * for each watcher watching a name not on the list, call that watcher's - * onResourceDoesNotExist method. - * @param allClusterNames - */ - handleMissingNames(allEdsServiceNames: Set) { - for (const [edsServiceName, watcherList] of this.watchers.entries()) { - if (!allEdsServiceNames.has(edsServiceName)) { - trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName); - for (const watcher of watcherList) { - watcher.onResourceDoesNotExist(); - } - } - } - } - - handleResponses(responses: ClusterLoadAssignment__Output[]) { - for (const message of responses) { - if (!this.validateResponse(message)) { - trace('EDS validation failed for message ' + JSON.stringify(message)); - return 'EDS Error: ClusterLoadAssignment validation failed'; - } - } - this.latestResponses = responses; - const allClusterNames: Set = new Set(); - for (const message of responses) { - allClusterNames.add(message.cluster_name); - const watchers = this.watchers.get(message.cluster_name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message); - } - } - trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); - this.handleMissingNames(allClusterNames); - return null; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } -} - -class CdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - private watchers: Map[]> = new Map< - string, - Watcher[] - >(); - - private latestResponses: Cluster__Output[] = []; - - constructor( - private edsState: EdsState, - private updateResourceNames: () => void - ) {} - - /** - * Add the watcher to the watcher list. Returns true if the list of resource - * names has changed, and false otherwise. - * @param clusterName - * @param watcher - */ - addWatcher(clusterName: string, watcher: Watcher): void { - trace('Adding CDS watcher for clusterName ' + clusterName); - let watchersEntry = this.watchers.get(clusterName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(clusterName, watchersEntry); - } - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - for (const message of this.latestResponses) { - if (message.name === clusterName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); - watcher.onValidUpdate(message); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } - } - - removeWatcher(clusterName: string, watcher: Watcher): void { - trace('Removing CDS watcher for clusterName ' + clusterName); - const watchersEntry = this.watchers.get(clusterName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(clusterName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } - } - - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); - } - - private validateResponse(message: Cluster__Output): boolean { - if (message.type !== 'EDS') { - return false; - } - if (!message.eds_cluster_config?.eds_config?.ads) { - return false; - } - if (message.lb_policy !== 'ROUND_ROBIN') { - return false; - } - if (message.lrs_server) { - if (!message.lrs_server.self) { - return false; - } - } - return true; - } - - /** - * Given a list of clusterNames (which may actually be the cluster name), - * for each watcher watching a name not on the list, call that watcher's - * onResourceDoesNotExist method. - * @param allClusterNames - */ - private handleMissingNames(allClusterNames: Set) { - for (const [clusterName, watcherList] of this.watchers.entries()) { - if (!allClusterNames.has(clusterName)) { - trace('Reporting CDS resource does not exist for clusterName ' + clusterName); - for (const watcher of watcherList) { - watcher.onResourceDoesNotExist(); - } - } - } - } - - handleResponses(responses: Cluster__Output[]): string | null { - for (const message of responses) { - if (!this.validateResponse(message)) { - trace('CDS validation failed for message ' + JSON.stringify(message)); - return 'CDS Error: Cluster validation failed'; - } - } - this.latestResponses = responses; - const allEdsServiceNames: Set = new Set(); - const allClusterNames: Set = new Set(); - for (const message of responses) { - allClusterNames.add(message.name); - const edsServiceName = message.eds_cluster_config?.service_name ?? ''; - allEdsServiceNames.add( - edsServiceName === '' ? message.name : edsServiceName - ); - const watchers = this.watchers.get(message.name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message); - } - } - trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); - this.handleMissingNames(allClusterNames); - this.edsState.handleMissingNames(allEdsServiceNames); - return null; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } -} - -class RdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - private routeConfigName: string | null = null; - - constructor( - private targetName: string, - private watcher: Watcher, - private updateResouceNames: () => void - ) {} - - getResourceNames(): string[] { - return this.routeConfigName ? [this.routeConfigName] : []; - } - - handleSingleMessage(message: RouteConfiguration__Output) { - for (const virtualHost of message.virtual_hosts) { - if (virtualHost.domains.indexOf(this.targetName) >= 0) { - const route = virtualHost.routes[virtualHost.routes.length - 1]; - if (route.match?.prefix === '' && route.route?.cluster) { - trace('Reporting RDS update for host ' + this.targetName + ' with cluster ' + route.route.cluster); - this.watcher.onValidUpdate({ - methodConfig: [], - loadBalancingConfig: [ - new CdsLoadBalancingConfig(route.route.cluster) - ], - }); - return; - } else { - trace('Discarded matching route with prefix ' + route.match?.prefix + ' and cluster ' + route.route?.cluster); - } - } - } - trace('Reporting RDS resource does not exist from domain lists ' + message.virtual_hosts.map(virtualHost => virtualHost.domains)); - /* If none of the routes match the one we are looking for, bubble up an - * error. */ - this.watcher.onResourceDoesNotExist(); - } - - handleResponses(responses: RouteConfiguration__Output[]): string | null { - trace('Received RDS response with route config names ' + responses.map(message => message.name)); - if (this.routeConfigName !== null) { - for (const message of responses) { - if (message.name === this.routeConfigName) { - this.handleSingleMessage(message); - return null; - } - } - } - return null; - } - - setRouteConfigName(name: string | null) { - const oldName = this.routeConfigName; - this.routeConfigName = name; - if (name !== oldName) { - this.updateResouceNames(); - } - } - - reportStreamError(status: StatusObject): void { - this.watcher.onTransientError(status); - } -} - -class LdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - constructor(private targetName: string, private rdsState: RdsState) {} - - getResourceNames(): string[] { - return [this.targetName]; - } - - private validateResponse(message: Listener__Output): boolean { - if ( - !( - message.api_listener?.api_listener && - protoLoader.isAnyExtension(message.api_listener.api_listener) && - message.api_listener?.api_listener['@type'] === - HTTP_CONNECTION_MANGER_TYPE_URL - ) - ) { - return false; - } - const httpConnectionManager = message.api_listener - ?.api_listener as protoLoader.AnyExtension & - HttpConnectionManager__Output; - switch (httpConnectionManager.route_specifier) { - case 'rds': - return !!httpConnectionManager.rds?.config_source?.ads; - case 'route_config': - return true; - } - return false; - } - - handleResponses(responses: Listener__Output[]): string | null { - trace('Received LDS update with names ' + responses.map(message => message.name)); - for (const message of responses) { - if (message.name === this.targetName) { - if (this.validateResponse(message)) { - // The validation step ensures that this is correct - const httpConnectionManager = message.api_listener! - .api_listener as protoLoader.AnyExtension & - HttpConnectionManager__Output; - switch (httpConnectionManager.route_specifier) { - case 'rds': - trace('Received LDS update with RDS route config name ' + httpConnectionManager.rds!.route_config_name); - this.rdsState.setRouteConfigName( - httpConnectionManager.rds!.route_config_name - ); - break; - case 'route_config': - trace('Received LDS update with route configuration'); - this.rdsState.setRouteConfigName(null); - this.rdsState.handleSingleMessage( - httpConnectionManager.route_config! - ); - break; - default: - // The validation rules should prevent this - } - } else { - trace('LRS validation error for message ' + JSON.stringify(message)); - return 'LRS Error: Listener validation failed'; - } - } - } - return null; - } - - reportStreamError(status: StatusObject): void { - // Nothing to do here - } -} - interface AdsState { [EDS_TYPE_URL]: EdsState; [CDS_TYPE_URL]: CdsState; @@ -728,21 +283,19 @@ export class XdsClient { private adsBackoff: BackoffTimeout; private lrsBackoff: BackoffTimeout; - constructor( - targetName: string, - serviceConfigWatcher: Watcher, - channelOptions: ChannelOptions - ) { + constructor() { const edsState = new EdsState(() => { this.updateNames(EDS_TYPE_URL); }); const cdsState = new CdsState(edsState, () => { this.updateNames(CDS_TYPE_URL); }); - const rdsState = new RdsState(targetName, serviceConfigWatcher, () => { + const rdsState = new RdsState(() => { this.updateNames(RDS_TYPE_URL); }); - const ldsState = new LdsState(targetName, rdsState); + const ldsState = new LdsState(rdsState, () => { + this.updateNames(LDS_TYPE_URL); + }); this.adsState = { [EDS_TYPE_URL]: edsState, [CDS_TYPE_URL]: cdsState, @@ -750,26 +303,10 @@ export class XdsClient { [LDS_TYPE_URL]: ldsState, }; - const channelArgs = { ...channelOptions }; - const channelArgsToRemove = [ - /* The SSL target name override corresponds to the target, and this - * client has its own target */ - 'grpc.ssl_target_name_override', - /* The default authority also corresponds to the target */ - 'grpc.default_authority', - /* This client will have its own specific keepalive time setting */ - 'grpc.keepalive_time_ms', - /* The service config specifies the load balancing policy. This channel - * needs its own separate load balancing policy setting. In particular, - * recursively using an xDS load balancer for the xDS client would be - * bad */ - 'grpc.service_config', - ]; - for (const arg of channelArgsToRemove) { - delete channelArgs[arg]; + const channelArgs = { + // 5 minutes + 'grpc.keepalive_time_ms': 5 * 60 * 1000 } - // 5 minutes - channelArgs['grpc.keepalive_time_ms'] = 5 * 60 * 1000; this.adsBackoff = new BackoffTimeout(() => { this.maybeStartAdsStream(); @@ -904,6 +441,12 @@ export class XdsClient { if (this.hasShutdown) { return; } + if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) { + return; + } trace('Starting ADS stream'); // Backoff relative to when we start the request this.adsBackoff.runOnce(); @@ -986,6 +529,16 @@ export class XdsClient { } private updateNames(typeUrl: AdsTypeUrl) { + if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) { + this.adsCall?.end(); + this.lrsCall?.end(); + return; + } + this.maybeStartAdsStream(); + this.maybeStartLrsStream(); trace('Sending update for type URL ' + typeUrl + ' with names ' + this.adsState[typeUrl].getResourceNames()); this.adsCall?.write({ node: this.adsNode!, @@ -1013,6 +566,12 @@ export class XdsClient { if (this.hasShutdown) { return; } + if (this.adsState[EDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[CDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[RDS_TYPE_URL].getResourceNames().length === 0 && + this.adsState[LDS_TYPE_URL].getResourceNames().length === 0) { + return; + } trace('Starting LRS stream'); @@ -1066,7 +625,6 @@ export class XdsClient { if (!this.lrsCall) { return; } - trace('Sending LRS stats'); const clusterStats: ClusterStats[] = []; for (const [ { clusterName, edsServiceName }, @@ -1127,6 +685,7 @@ export class XdsClient { } } } + trace('Sending LRS stats ' + JSON.stringify(clusterStats, undefined, 2)); this.lrsCall.write({ node: this.lrsNode!, cluster_stats: clusterStats, @@ -1159,6 +718,26 @@ export class XdsClient { this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher); } + addRouteWatcher(routeConfigName: string, watcher: Watcher) { + trace('Watcher added for route ' + routeConfigName); + this.adsState[RDS_TYPE_URL].addWatcher(routeConfigName, watcher); + } + + removeRouteWatcher(routeConfigName: string, watcher: Watcher) { + trace('Watcher removed for route ' + routeConfigName); + this.adsState[RDS_TYPE_URL].removeWatcher(routeConfigName, watcher); + } + + addListenerWatcher(targetName: string, watcher: Watcher) { + trace('Watcher added for listener ' + targetName); + this.adsState[LDS_TYPE_URL].addWatcher(targetName, watcher); + } + + removeListenerWatcher(targetName: string, watcher: Watcher) { + trace('Watcher removed for listener ' + targetName); + this.adsState[LDS_TYPE_URL].removeWatcher(targetName, watcher); + } + /** * * @param lrsServer The target name of the server to send stats to. An empty @@ -1172,6 +751,7 @@ export class XdsClient { clusterName: string, edsServiceName: string ): XdsClusterDropStats { + trace('addClusterDropStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')'); if (lrsServer !== '') { return { addCallDropped: (category) => {}, @@ -1195,6 +775,7 @@ export class XdsClient { edsServiceName: string, locality: Locality__Output ): XdsClusterLocalityStats { + trace('addClusterLocalityStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ', locality=' + JSON.stringify(locality) + ')'); if (lrsServer !== '') { return { addCallStarted: () => {}, @@ -1241,7 +822,7 @@ export class XdsClient { }; } - shutdown(): void { + private shutdown(): void { this.adsCall?.cancel(); this.adsClient?.close(); this.lrsCall?.cancel(); @@ -1249,3 +830,12 @@ export class XdsClient { this.hasShutdown = true; } } + +let singletonXdsClient: XdsClient | null = null; + +export function getSingletonXdsClient(): XdsClient { + if (singletonXdsClient === null) { + singletonXdsClient = new XdsClient(); + } + return singletonXdsClient; +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts new file mode 100644 index 000000000..343089958 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -0,0 +1,171 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { Cluster__Output } from "../generated/envoy/api/v2/Cluster"; +import { EdsState } from "./eds-state"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +export class CdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private watchers: Map[]> = new Map< + string, + Watcher[] + >(); + + private latestResponses: Cluster__Output[] = []; + + constructor( + private edsState: EdsState, + private updateResourceNames: () => void + ) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param clusterName + * @param watcher + */ + addWatcher(clusterName: string, watcher: Watcher): void { + trace('Adding CDS watcher for clusterName ' + clusterName); + let watchersEntry = this.watchers.get(clusterName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(clusterName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.name === clusterName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher(clusterName: string, watcher: Watcher): void { + trace('Removing CDS watcher for clusterName ' + clusterName); + const watchersEntry = this.watchers.get(clusterName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(clusterName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + private validateResponse(message: Cluster__Output): boolean { + if (message.type !== 'EDS') { + return false; + } + if (!message.eds_cluster_config?.eds_config?.ads) { + return false; + } + if (message.lb_policy !== 'ROUND_ROBIN') { + return false; + } + if (message.lrs_server) { + if (!message.lrs_server.self) { + return false; + } + } + return true; + } + + /** + * Given a list of clusterNames (which may actually be the cluster name), + * for each watcher watching a name not on the list, call that watcher's + * onResourceDoesNotExist method. + * @param allClusterNames + */ + private handleMissingNames(allClusterNames: Set) { + for (const [clusterName, watcherList] of this.watchers.entries()) { + if (!allClusterNames.has(clusterName)) { + trace('Reporting CDS resource does not exist for clusterName ' + clusterName); + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: Cluster__Output[]): string | null { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('CDS validation failed for message ' + JSON.stringify(message)); + return 'CDS Error: Cluster validation failed'; + } + } + this.latestResponses = responses; + const allEdsServiceNames: Set = new Set(); + const allClusterNames: Set = new Set(); + for (const message of responses) { + allClusterNames.add(message.name); + const edsServiceName = message.eds_cluster_config?.service_name ?? ''; + allEdsServiceNames.add( + edsServiceName === '' ? message.name : edsServiceName + ); + const watchers = this.watchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); + this.handleMissingNames(allClusterNames); + this.edsState.handleMissingNames(allEdsServiceNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts new file mode 100644 index 000000000..c9beef292 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -0,0 +1,174 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { isIPv4, isIPv6 } from "net"; +import { ClusterLoadAssignment__Output } from "../generated/envoy/api/v2/ClusterLoadAssignment"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +export class EdsState implements XdsStreamState { + public versionInfo = ''; + public nonce = ''; + + private watchers: Map< + string, + Watcher[] + > = new Map[]>(); + + private latestResponses: ClusterLoadAssignment__Output[] = []; + + constructor(private updateResourceNames: () => void) {} + + /** + * Add the watcher to the watcher list. Returns true if the list of resource + * names has changed, and false otherwise. + * @param edsServiceName + * @param watcher + */ + addWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + let watchersEntry = this.watchers.get(edsServiceName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(edsServiceName, watchersEntry); + } + trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName); + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.cluster_name === edsServiceName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher( + edsServiceName: string, + watcher: Watcher + ): void { + trace('Removing EDS watcher for edsServiceName ' + edsServiceName); + const watchersEntry = this.watchers.get(edsServiceName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName); + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(edsServiceName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + /** + * Validate the ClusterLoadAssignment object by these rules: + * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto + * @param message + */ + private validateResponse(message: ClusterLoadAssignment__Output) { + for (const endpoint of message.endpoints) { + for (const lb of endpoint.lb_endpoints) { + const socketAddress = lb.endpoint?.address?.socket_address; + if (!socketAddress) { + return false; + } + if (socketAddress.port_specifier !== 'port_value') { + return false; + } + if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) { + return false; + } + } + } + return true; + } + + /** + * Given a list of edsServiceNames (which may actually be the cluster name), + * for each watcher watching a name not on the list, call that watcher's + * onResourceDoesNotExist method. + * @param allClusterNames + */ + handleMissingNames(allEdsServiceNames: Set) { + for (const [edsServiceName, watcherList] of this.watchers.entries()) { + if (!allEdsServiceNames.has(edsServiceName)) { + trace('Reporting EDS resource does not exist for edsServiceName ' + edsServiceName); + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: ClusterLoadAssignment__Output[]) { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('EDS validation failed for message ' + JSON.stringify(message)); + return 'EDS Error: ClusterLoadAssignment validation failed'; + } + } + this.latestResponses = responses; + const allClusterNames: Set = new Set(); + for (const message of responses) { + allClusterNames.add(message.cluster_name); + const watchers = this.watchers.get(message.cluster_name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); + this.handleMissingNames(allClusterNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts new file mode 100644 index 000000000..554712727 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -0,0 +1,155 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as protoLoader from '@grpc/proto-loader'; +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { Listener__Output } from "../generated/envoy/api/v2/Listener"; +import { RdsState } from "./rds-state"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; +import { HttpConnectionManager__Output } from '../generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager'; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const HTTP_CONNECTION_MANGER_TYPE_URL = + 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager'; + +export class LdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private watchers: Map[]> = new Map[]>(); + private latestResponses: Listener__Output[] = []; + + constructor(private rdsState: RdsState, private updateResourceNames: () => void) {} + + addWatcher(targetName: string, watcher: Watcher) { + trace('Adding RDS watcher for targetName ' + targetName); + let watchersEntry = this.watchers.get(targetName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(targetName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.name === targetName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing RDS update for new watcher for targetName ' + targetName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher(targetName: string, watcher: Watcher): void { + trace('Removing RDS watcher for targetName ' + targetName); + const watchersEntry = this.watchers.get(targetName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(targetName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + private validateResponse(message: Listener__Output): boolean { + if ( + !( + message.api_listener?.api_listener && + protoLoader.isAnyExtension(message.api_listener.api_listener) && + message.api_listener?.api_listener['@type'] === + HTTP_CONNECTION_MANGER_TYPE_URL + ) + ) { + return false; + } + const httpConnectionManager = message.api_listener + ?.api_listener as protoLoader.AnyExtension & + HttpConnectionManager__Output; + switch (httpConnectionManager.route_specifier) { + case 'rds': + return !!httpConnectionManager.rds?.config_source?.ads; + case 'route_config': + return this.rdsState.validateResponse(httpConnectionManager.route_config!); + } + return false; + } + + private handleMissingNames(allTargetNames: Set) { + for (const [targetName, watcherList] of this.watchers.entries()) { + if (!allTargetNames.has(targetName)) { + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: Listener__Output[]): string | null { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('LDS validation failed for message ' + JSON.stringify(message)); + return 'LDS Error: Route validation failed'; + } + } + this.latestResponses = responses; + const allTargetNames = new Set(); + for (const message of responses) { + allTargetNames.add(message.name); + const watchers = this.watchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received RDS response with route config names ' + Array.from(allTargetNames)); + this.handleMissingNames(allTargetNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts new file mode 100644 index 000000000..2ac924d98 --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -0,0 +1,191 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { GRPC_XDS_EXPERIMENTAL_ROUTING } from "../environment"; +import { RouteConfiguration__Output } from "../generated/envoy/api/v2/RouteConfiguration"; +import { CdsLoadBalancingConfig } from "../load-balancer-cds"; +import { Watcher, XdsStreamState } from "./xds-stream-state"; +import ServiceConfig = experimental.ServiceConfig; + +const TRACER_NAME = 'xds_client'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex']; +const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [ + 'exact_match', + 'safe_regex_match', + 'range_match', + 'present_match', + 'prefix_match', + 'suffix_match']; +const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header']; + +export class RdsState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private watchers: Map[]> = new Map[]>(); + private latestResponses: RouteConfiguration__Output[] = []; + + constructor(private updateResourceNames: () => void) {} + + addWatcher(routeConfigName: string, watcher: Watcher) { + trace('Adding RDS watcher for routeConfigName ' + routeConfigName); + let watchersEntry = this.watchers.get(routeConfigName); + let addedServiceName = false; + if (watchersEntry === undefined) { + addedServiceName = true; + watchersEntry = []; + this.watchers.set(routeConfigName, watchersEntry); + } + watchersEntry.push(watcher); + + /* If we have already received an update for the requested edsServiceName, + * immediately pass that update along to the watcher */ + for (const message of this.latestResponses) { + if (message.name === routeConfigName) { + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName); + watcher.onValidUpdate(message); + }); + } + } + if (addedServiceName) { + this.updateResourceNames(); + } + } + + removeWatcher(routeConfigName: string, watcher: Watcher): void { + trace('Removing RDS watcher for routeConfigName ' + routeConfigName); + const watchersEntry = this.watchers.get(routeConfigName); + let removedServiceName = false; + if (watchersEntry !== undefined) { + const entryIndex = watchersEntry.indexOf(watcher); + if (entryIndex >= 0) { + watchersEntry.splice(entryIndex, 1); + } + if (watchersEntry.length === 0) { + removedServiceName = true; + this.watchers.delete(routeConfigName); + } + } + if (removedServiceName) { + this.updateResourceNames(); + } + } + + getResourceNames(): string[] { + return Array.from(this.watchers.keys()); + } + + validateResponse(message: RouteConfiguration__Output): boolean { + if (GRPC_XDS_EXPERIMENTAL_ROUTING) { + // https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation + for (const virtualHost of message.virtual_hosts) { + for (const domainPattern of virtualHost.domains) { + const starIndex = domainPattern.indexOf('*'); + const lastStarIndex = domainPattern.lastIndexOf('*'); + // A domain pattern can have at most one wildcard * + if (starIndex !== lastStarIndex) { + return false; + } + // A wildcard * can either be absent or at the beginning or end of the pattern + if (!(starIndex === -1 || starIndex === 0 || starIndex === domainPattern.length - 1)) { + return false; + } + } + for (const route of virtualHost.routes) { + const match = route.match; + if (!match) { + return false; + } + if (SUPPORTED_PATH_SPECIFIERS.indexOf(match.path_specifier) < 0) { + return false; + } + for (const headers of match.headers) { + if (SUPPPORTED_HEADER_MATCH_SPECIFIERS.indexOf(headers.header_match_specifier) < 0) { + return false; + } + } + if (route.action !== 'route') { + return false; + } + if ((route.route === undefined) || SUPPORTED_CLUSTER_SPECIFIERS.indexOf(route.route.cluster_specifier) < 0) { + return false; + } + if (route.route!.cluster_specifier === 'weighted_clusters') { + let weightSum = 0; + for (const clusterWeight of route.route.weighted_clusters!.clusters) { + weightSum += clusterWeight.weight?.value ?? 0; + } + if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) { + return false; + } + } + } + } + return true; + } else { + return true; + } + } + + private handleMissingNames(allRouteConfigNames: Set) { + for (const [routeConfigName, watcherList] of this.watchers.entries()) { + if (!allRouteConfigNames.has(routeConfigName)) { + for (const watcher of watcherList) { + watcher.onResourceDoesNotExist(); + } + } + } + } + + handleResponses(responses: RouteConfiguration__Output[]): string | null { + for (const message of responses) { + if (!this.validateResponse(message)) { + trace('RDS validation failed for message ' + JSON.stringify(message)); + return 'RDS Error: Route validation failed'; + } + } + this.latestResponses = responses; + const allRouteConfigNames = new Set(); + for (const message of responses) { + allRouteConfigNames.add(message.name); + const watchers = this.watchers.get(message.name) ?? []; + for (const watcher of watchers) { + watcher.onValidUpdate(message); + } + } + trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames)); + this.handleMissingNames(allRouteConfigNames); + return null; + } + + reportStreamError(status: StatusObject): void { + for (const watcherList of this.watchers.values()) { + for (const watcher of watcherList) { + watcher.onTransientError(status); + } + } + } +} \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts new file mode 100644 index 000000000..83db1781e --- /dev/null +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -0,0 +1,38 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { StatusObject } from "@grpc/grpc-js"; + +export interface Watcher { + onValidUpdate(update: UpdateType): void; + onTransientError(error: StatusObject): void; + onResourceDoesNotExist(): void; +} + +export interface XdsStreamState { + versionInfo: string; + nonce: string; + getResourceNames(): string[]; + /** + * Returns a string containing the error details if the message should be nacked, + * or null if it should be acked. + * @param responses + */ + handleResponses(responses: ResponseType[]): string | null; + + reportStreamError(status: StatusObject): void; +} \ No newline at end of file diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 9c27aeb1f..7d0abb943 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -415,21 +415,8 @@ export class Http2CallStream implements Call { ); } const status: StatusObject = { code, details, metadata }; - let finalStatus; - try { - // Attempt to assign final status. - finalStatus = this.filterStack.receiveTrailers(status); - } catch (error) { - // This is a no-op if the call was already ended when handling headers. - this.endCall({ - code: Status.INTERNAL, - details: 'Failed to process received status', - metadata: new Metadata(), - }); - return; - } // This is a no-op if the call was already ended when handling headers. - this.endCall(finalStatus); + this.endCall(status); } attachHttp2Stream( diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index c27f9810a..f62838c12 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -1,5 +1,5 @@ export { trace } from './logging'; -export { Resolver, ResolverListener, registerResolver } from './resolver'; +export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver'; export { GrpcUri, uriToString } from './uri-parser'; export { ServiceConfig } from './service-config'; export { BackoffTimeout } from './backoff-timeout';