From 067bb13f275cf2bbcb2fbacf4a757a7a3c7d4035 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 12 May 2022 17:18:55 -0700 Subject: [PATCH 1/5] grpc-js-xds: Refactor xDS stream state and add resource timer --- packages/grpc-js-xds/src/xds-client.ts | 10 +- .../src/xds-stream-state/cds-state.ts | 156 +-------------- .../src/xds-stream-state/eds-state.ts | 130 +----------- .../src/xds-stream-state/lds-state.ts | 135 ++----------- .../src/xds-stream-state/rds-state.ts | 122 +---------- .../src/xds-stream-state/xds-stream-state.ts | 189 +++++++++++++++++- 6 files changed, 236 insertions(+), 506 deletions(-) diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 7a12af1f6..5f75a2cab 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -309,7 +309,7 @@ export class XdsClient { const edsState = new EdsState(() => { this.updateNames('eds'); }); - const cdsState = new CdsState(edsState, () => { + const cdsState = new CdsState(() => { this.updateNames('cds'); }); const rdsState = new RdsState(() => { @@ -630,6 +630,7 @@ export class XdsClient { this.updateNames(service); } } + this.reportAdsStreamStarted(); } } @@ -777,6 +778,13 @@ export class XdsClient { this.adsState.lds.reportStreamError(status); } + private reportAdsStreamStarted() { + this.adsState.eds.reportAdsStreamStart(); + this.adsState.cds.reportAdsStreamStart(); + this.adsState.rds.reportAdsStreamStart(); + this.adsState.lds.reportAdsStreamStart(); + } + private handleLrsResponse(message: LoadStatsResponse__Output) { trace('Received LRS response'); /* Once we get any response from the server, we assume that the stream is diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts index 8c3c4d739..9fd12d6ed 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -15,94 +15,21 @@ * */ -import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment"; import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster"; -import { Any__Output } from "../generated/google/protobuf/Any"; import { Duration__Output } from "../generated/google/protobuf/Duration"; import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value"; -import { EdsState } from "./eds-state"; -import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; +import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state"; -const TRACER_NAME = 'xds_client'; - -function trace(text: string): void { - experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); -} - -export class CdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - private watchers: Map[]> = new Map< - string, - Watcher[] - >(); - - private latestResponses: Cluster__Output[] = []; - private latestIsV2 = false; - - constructor( - private edsState: EdsState, - private updateResourceNames: () => void - ) {} - - /** - * Add the watcher to the watcher list. Returns true if the list of resource - * names has changed, and false otherwise. - * @param clusterName - * @param watcher - */ - addWatcher(clusterName: string, watcher: Watcher): void { - trace('Adding CDS watcher for clusterName ' + clusterName); - let watchersEntry = this.watchers.get(clusterName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(clusterName, watchersEntry); - } - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - const isV2 = this.latestIsV2; - for (const message of this.latestResponses) { - if (message.name === clusterName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName); - watcher.onValidUpdate(message, isV2); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } +export class CdsState extends BaseXdsStreamState implements XdsStreamState { + protected isStateOfTheWorld(): boolean { + return true; } - - removeWatcher(clusterName: string, watcher: Watcher): void { - trace('Removing CDS watcher for clusterName ' + clusterName); - const watchersEntry = this.watchers.get(clusterName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(clusterName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } + protected getResourceName(resource: Cluster__Output): string { + return resource.name; } - - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); + protected getProtocolName(): string { + return 'CDS'; } private validateNonnegativeDuration(duration: Duration__Output | null): boolean { @@ -125,7 +52,7 @@ export class CdsState implements XdsStreamState { return percentage.value >=0 && percentage.value <= 100; } - private validateResponse(message: Cluster__Output): boolean { + public validateResponse(message: Cluster__Output): boolean { if (message.type !== 'EDS') { return false; } @@ -167,69 +94,4 @@ export class CdsState implements XdsStreamState { } return true; } - - /** - * Given a list of clusterNames (which may actually be the cluster name), - * for each watcher watching a name not on the list, call that watcher's - * onResourceDoesNotExist method. - * @param allClusterNames - */ - private handleMissingNames(allClusterNames: Set): string[] { - const missingNames: string[] = []; - for (const [clusterName, watcherList] of this.watchers.entries()) { - if (!allClusterNames.has(clusterName)) { - trace('Reporting CDS resource does not exist for clusterName ' + clusterName); - missingNames.push(clusterName); - for (const watcher of watcherList) { - watcher.onResourceDoesNotExist(); - } - } - } - return missingNames; - } - - handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { - const validResponses: Cluster__Output[] = []; - const result: HandleResponseResult = { - accepted: [], - rejected: [], - missing: [] - } - for (const {resource, raw} of responses) { - if (this.validateResponse(resource)) { - validResponses.push(resource); - result.accepted.push({ - name: resource.name, - raw: raw}); - } else { - trace('CDS validation failed for message ' + JSON.stringify(resource)); - result.rejected.push({ - name: resource.name, - raw: raw, - error: `Cluster validation failed for resource ${resource.name}` - }); - } - } - this.latestResponses = validResponses; - this.latestIsV2 = isV2; - const allClusterNames: Set = new Set(); - for (const message of validResponses) { - allClusterNames.add(message.name); - const watchers = this.watchers.get(message.name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message, isV2); - } - } - trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']'); - result.missing = this.handleMissingNames(allClusterNames); - return result; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } } \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index fe9f3c624..fb2a99485 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -19,7 +19,7 @@ import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { isIPv4, isIPv6 } from "net"; import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment"; import { Any__Output } from "../generated/google/protobuf/Any"; -import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; +import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; const TRACER_NAME = 'xds_client'; @@ -27,83 +27,15 @@ function trace(text: string): void { experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); } -export class EdsState implements XdsStreamState { - public versionInfo = ''; - public nonce = ''; - - private watchers: Map< - string, - Watcher[] - > = new Map[]>(); - - private latestResponses: ClusterLoadAssignment__Output[] = []; - private latestIsV2 = false; - - constructor(private updateResourceNames: () => void) {} - - /** - * Add the watcher to the watcher list. Returns true if the list of resource - * names has changed, and false otherwise. - * @param edsServiceName - * @param watcher - */ - addWatcher( - edsServiceName: string, - watcher: Watcher - ): void { - let watchersEntry = this.watchers.get(edsServiceName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(edsServiceName, watchersEntry); - } - trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName); - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - const isV2 = this.latestIsV2; - for (const message of this.latestResponses) { - if (message.cluster_name === edsServiceName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName); - watcher.onValidUpdate(message, isV2); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } +export class EdsState extends BaseXdsStreamState implements XdsStreamState { + protected getResourceName(resource: ClusterLoadAssignment__Output): string { + return resource.cluster_name; } - - removeWatcher( - edsServiceName: string, - watcher: Watcher - ): void { - trace('Removing EDS watcher for edsServiceName ' + edsServiceName); - const watchersEntry = this.watchers.get(edsServiceName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName); - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(edsServiceName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } + protected getProtocolName(): string { + return 'EDS'; } - - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); + protected isStateOfTheWorld(): boolean { + return false; } /** @@ -111,7 +43,7 @@ export class EdsState implements XdsStreamState { * https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto * @param message */ - private validateResponse(message: ClusterLoadAssignment__Output) { + public validateResponse(message: ClusterLoadAssignment__Output) { for (const endpoint of message.endpoints) { for (const lb of endpoint.lb_endpoints) { const socketAddress = lb.endpoint?.address?.socket_address; @@ -128,48 +60,4 @@ export class EdsState implements XdsStreamState { } return true; } - - handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { - const validResponses: ClusterLoadAssignment__Output[] = []; - let result: HandleResponseResult = { - accepted: [], - rejected: [], - missing: [] - } - for (const {resource, raw} of responses) { - if (this.validateResponse(resource)) { - validResponses.push(resource); - result.accepted.push({ - name: resource.cluster_name, - raw: raw}); - } else { - trace('EDS validation failed for message ' + JSON.stringify(resource)); - result.rejected.push({ - name: resource.cluster_name, - raw: raw, - error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}` - }); - } - } - this.latestResponses = validResponses; - this.latestIsV2 = isV2; - const allClusterNames: Set = new Set(); - for (const message of validResponses) { - allClusterNames.add(message.cluster_name); - const watchers = this.watchers.get(message.cluster_name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message, isV2); - } - } - trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']'); - return result; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } } \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 7c27c948f..bd5b6423f 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -15,16 +15,13 @@ * */ -import * as protoLoader from '@grpc/proto-loader'; -import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; +import { experimental, logVerbosity } from "@grpc/grpc-js"; import { Listener__Output } from '../generated/envoy/config/listener/v3/Listener'; import { RdsState } from "./rds-state"; -import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; -import { HttpConnectionManager__Output } from '../generated/envoy/extensions/filters/network/http_connection_manager/v3/HttpConnectionManager'; +import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state"; import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL_V2, HTTP_CONNECTION_MANGER_TYPE_URL_V3 } from '../resources'; import { getTopLevelFilterUrl, validateTopLevelFilter } from '../http-filter'; import { EXPERIMENTAL_FAULT_INJECTION } from '../environment'; -import { Any__Output } from '../generated/google/protobuf/Any'; const TRACER_NAME = 'xds_client'; @@ -34,69 +31,22 @@ function trace(text: string): void { const ROUTER_FILTER_URL = 'type.googleapis.com/envoy.extensions.filters.http.router.v3.Router'; -export class LdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - private watchers: Map[]> = new Map[]>(); - private latestResponses: Listener__Output[] = []; - private latestIsV2 = false; - - constructor(private rdsState: RdsState, private updateResourceNames: () => void) {} - - addWatcher(targetName: string, watcher: Watcher) { - trace('Adding RDS watcher for targetName ' + targetName); - let watchersEntry = this.watchers.get(targetName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(targetName, watchersEntry); - } - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - const isV2 = this.latestIsV2; - for (const message of this.latestResponses) { - if (message.name === targetName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing RDS update for new watcher for targetName ' + targetName); - watcher.onValidUpdate(message, isV2); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } +export class LdsState extends BaseXdsStreamState implements XdsStreamState { + protected getResourceName(resource: Listener__Output): string { + return resource.name; } - - removeWatcher(targetName: string, watcher: Watcher): void { - trace('Removing RDS watcher for targetName ' + targetName); - const watchersEntry = this.watchers.get(targetName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(targetName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } + protected getProtocolName(): string { + return 'LDS'; + } + protected isStateOfTheWorld(): boolean { + return true; } - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); + constructor(private rdsState: RdsState, updateResourceNames: () => void) { + super(updateResourceNames); } - private validateResponse(message: Listener__Output, isV2: boolean): boolean { + public validateResponse(message: Listener__Output, isV2: boolean): boolean { if ( !( message.api_listener?.api_listener && @@ -143,63 +93,4 @@ export class LdsState implements XdsStreamState { } return false; } - - private handleMissingNames(allTargetNames: Set): string[] { - const missingNames: string[] = []; - for (const [targetName, watcherList] of this.watchers.entries()) { - if (!allTargetNames.has(targetName)) { - missingNames.push(targetName); - for (const watcher of watcherList) { - watcher.onResourceDoesNotExist(); - } - } - } - return missingNames; - } - - handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { - const validResponses: Listener__Output[] = []; - let result: HandleResponseResult = { - accepted: [], - rejected: [], - missing: [] - } - for (const {resource, raw} of responses) { - if (this.validateResponse(resource, isV2)) { - validResponses.push(resource); - result.accepted.push({ - name: resource.name, - raw: raw - }); - } else { - trace('LDS validation failed for message ' + JSON.stringify(resource)); - result.rejected.push({ - name: resource.name, - raw: raw, - error: `Listener validation failed for resource ${resource.name}` - }); - } - } - this.latestResponses = validResponses; - this.latestIsV2 = isV2; - const allTargetNames = new Set(); - for (const message of validResponses) { - allTargetNames.add(message.name); - const watchers = this.watchers.get(message.name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message, isV2); - } - } - trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']'); - result.missing = this.handleMissingNames(allTargetNames); - return result; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } } \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index bc1c4a818..77a84469b 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -15,20 +15,10 @@ * */ -import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { EXPERIMENTAL_FAULT_INJECTION } from "../environment"; import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration"; -import { Any__Output } from "../generated/google/protobuf/Any"; import { validateOverrideFilter } from "../http-filter"; -import { CdsLoadBalancingConfig } from "../load-balancer-cds"; -import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state"; -import ServiceConfig = experimental.ServiceConfig; - -const TRACER_NAME = 'xds_client'; - -function trace(text: string): void { - experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); -} +import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state"; const SUPPORTED_PATH_SPECIFIERS = ['prefix', 'path', 'safe_regex']; const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [ @@ -40,68 +30,16 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [ 'suffix_match']; const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header']; -export class RdsState implements XdsStreamState { - versionInfo = ''; - nonce = ''; - - private watchers: Map[]> = new Map[]>(); - private latestResponses: RouteConfiguration__Output[] = []; - private latestIsV2 = false; - - constructor(private updateResourceNames: () => void) {} - - addWatcher(routeConfigName: string, watcher: Watcher) { - trace('Adding RDS watcher for routeConfigName ' + routeConfigName); - let watchersEntry = this.watchers.get(routeConfigName); - let addedServiceName = false; - if (watchersEntry === undefined) { - addedServiceName = true; - watchersEntry = []; - this.watchers.set(routeConfigName, watchersEntry); - } - watchersEntry.push(watcher); - - /* If we have already received an update for the requested edsServiceName, - * immediately pass that update along to the watcher */ - const isV2 = this.latestIsV2; - for (const message of this.latestResponses) { - if (message.name === routeConfigName) { - /* These updates normally occur asynchronously, so we ensure that - * the same happens here */ - process.nextTick(() => { - trace('Reporting existing RDS update for new watcher for routeConfigName ' + routeConfigName); - watcher.onValidUpdate(message, isV2); - }); - } - } - if (addedServiceName) { - this.updateResourceNames(); - } +export class RdsState extends BaseXdsStreamState implements XdsStreamState { + protected isStateOfTheWorld(): boolean { + return false; } - - removeWatcher(routeConfigName: string, watcher: Watcher): void { - trace('Removing RDS watcher for routeConfigName ' + routeConfigName); - const watchersEntry = this.watchers.get(routeConfigName); - let removedServiceName = false; - if (watchersEntry !== undefined) { - const entryIndex = watchersEntry.indexOf(watcher); - if (entryIndex >= 0) { - watchersEntry.splice(entryIndex, 1); - } - if (watchersEntry.length === 0) { - removedServiceName = true; - this.watchers.delete(routeConfigName); - } - } - if (removedServiceName) { - this.updateResourceNames(); - } + protected getResourceName(resource: RouteConfiguration__Output): string { + return resource.name; } - - getResourceNames(): string[] { - return Array.from(this.watchers.keys()); + protected getProtocolName(): string { + return 'RDS'; } - validateResponse(message: RouteConfiguration__Output, isV2: boolean): boolean { // https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation for (const virtualHost of message.virtual_hosts) { @@ -172,48 +110,4 @@ export class RdsState implements XdsStreamState { } return true; } - - handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { - const validResponses: RouteConfiguration__Output[] = []; - let result: HandleResponseResult = { - accepted: [], - rejected: [], - missing: [] - } - for (const {resource, raw} of responses) { - if (this.validateResponse(resource, isV2)) { - validResponses.push(resource); - result.accepted.push({ - name: resource.name, - raw: raw}); - } else { - trace('RDS validation failed for message ' + JSON.stringify(resource)); - result.rejected.push({ - name: resource.name, - raw: raw, - error: `Route validation failed for resource ${resource.name}` - }); - } - } - this.latestResponses = validResponses; - this.latestIsV2 = isV2; - const allRouteConfigNames = new Set(); - for (const message of validResponses) { - allRouteConfigNames.add(message.name); - const watchers = this.watchers.get(message.name) ?? []; - for (const watcher of watchers) { - watcher.onValidUpdate(message, isV2); - } - } - trace('Received RDS response with route config names [' + Array.from(allRouteConfigNames) + ']'); - return result; - } - - reportStreamError(status: StatusObject): void { - for (const watcherList of this.watchers.values()) { - for (const watcher of watcherList) { - watcher.onTransientError(status); - } - } - } } \ No newline at end of file diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index c8cbc41cf..2ff5130c4 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -15,9 +15,11 @@ * */ -import { StatusObject } from "@grpc/grpc-js"; +import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js"; import { Any__Output } from "../generated/google/protobuf/Any"; +const TRACER_NAME = 'xds_client'; + export interface Watcher { /* Including the isV2 flag here is a bit of a kludge. It would probably be * better for XdsStreamState#handleResponses to transform the protobuf @@ -63,4 +65,189 @@ export interface XdsStreamState { handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult; reportStreamError(status: StatusObject): void; + reportAdsStreamStart(): void; + + addWatcher(name: string, watcher: Watcher): void; + removeWatcher(resourceName: string, watcher: Watcher): void; +} + +interface SubscriptionEntry { + watchers: Watcher[]; + cachedResponse: ResponseType | null; + resourceTimer: NodeJS.Timer; +} + +const RESOURCE_TIMEOUT_MS = 15_000; + +export abstract class BaseXdsStreamState implements XdsStreamState { + versionInfo = ''; + nonce = ''; + + private subscriptions: Map> = new Map>(); + private latestIsV2 = false; + private isAdsStreamRunning = false; + + constructor(private updateResourceNames: () => void) {} + + protected trace(text: string) { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, this.getProtocolName() + ' | ' + text); + } + + private startResourceTimer(subscriptionEntry: SubscriptionEntry) { + clearTimeout(subscriptionEntry.resourceTimer); + subscriptionEntry.resourceTimer = setTimeout(() => { + for (const watcher of subscriptionEntry.watchers) { + watcher.onResourceDoesNotExist(); + } + }, RESOURCE_TIMEOUT_MS); + } + + addWatcher(name: string, watcher: Watcher): void { + this.trace('Adding watcher for name ' + name); + let subscriptionEntry = this.subscriptions.get(name); + let addedName = false; + if (subscriptionEntry === undefined) { + addedName = true; + subscriptionEntry = { + watchers: [], + cachedResponse: null, + resourceTimer: setTimeout(() => {}, 0) + }; + this.startResourceTimer(subscriptionEntry); + this.subscriptions.set(name, subscriptionEntry); + } + subscriptionEntry.watchers.push(watcher); + if (subscriptionEntry.cachedResponse !== null) { + const cachedResponse = subscriptionEntry.cachedResponse; + /* These updates normally occur asynchronously, so we ensure that + * the same happens here */ + process.nextTick(() => { + this.trace('Reporting existing update for new watcher for name ' + name); + watcher.onValidUpdate(cachedResponse, this.latestIsV2); + }); + } + if (addedName) { + this.updateResourceNames(); + } + } + removeWatcher(resourceName: string, watcher: Watcher): void { + this.trace('Removing watcher for name ' + resourceName); + const subscriptionEntry = this.subscriptions.get(resourceName); + if (subscriptionEntry !== undefined) { + const entryIndex = subscriptionEntry.watchers.indexOf(watcher); + if (entryIndex >= 0) { + subscriptionEntry.watchers.splice(entryIndex, 1); + } + if (subscriptionEntry.watchers.length === 0) { + clearTimeout(subscriptionEntry.resourceTimer); + this.subscriptions.delete(resourceName); + this.updateResourceNames(); + } + } + } + + getResourceNames(): string[] { + return Array.from(this.subscriptions.keys()); + } + handleResponses(responses: ResourcePair[], isV2: boolean): HandleResponseResult { + const validResponses: ResponseType[] = []; + let result: HandleResponseResult = { + accepted: [], + rejected: [], + missing: [] + } + for (const {resource, raw} of responses) { + const resourceName = this.getResourceName(resource); + if (this.validateResponse(resource, isV2)) { + validResponses.push(resource); + result.accepted.push({ + name: resourceName, + raw: raw}); + } else { + this.trace('Validation failed for message ' + JSON.stringify(resource)); + result.rejected.push({ + name: resourceName, + raw: raw, + error: `Validation failed for resource ${resourceName}` + }); + } + } + this.latestIsV2 = isV2; + const allResourceNames = new Set(); + for (const resource of validResponses) { + const resourceName = this.getResourceName(resource); + allResourceNames.add(resourceName); + const subscriptionEntry = this.subscriptions.get(resourceName); + if (subscriptionEntry) { + const watchers = subscriptionEntry.watchers; + for (const watcher of watchers) { + watcher.onValidUpdate(resource, isV2); + } + clearTimeout(subscriptionEntry.resourceTimer); + subscriptionEntry.cachedResponse = resource; + } + } + result.missing = this.handleMissingNames(allResourceNames); + this.trace('Received response with resource names [' + Array.from(allResourceNames) + ']'); + return result; + } + reportStreamError(status: StatusObject): void { + for (const subscriptionEntry of this.subscriptions.values()) { + for (const watcher of subscriptionEntry.watchers) { + watcher.onTransientError(status); + } + clearTimeout(subscriptionEntry.resourceTimer); + } + this.isAdsStreamRunning = false; + } + + reportAdsStreamStart() { + this.isAdsStreamRunning = true; + for (const subscriptionEntry of this.subscriptions.values()) { + if (subscriptionEntry.cachedResponse === null) { + this.startResourceTimer(subscriptionEntry); + } + } + } + + private handleMissingNames(allResponseNames: Set): string[] { + if (this.isStateOfTheWorld()) { + const missingNames: string[] = []; + for (const [resourceName, subscriptionEntry] of this.subscriptions.entries()) { + if (!allResponseNames.has(resourceName) && subscriptionEntry.cachedResponse !== null) { + this.trace('Reporting resource does not exist named ' + resourceName); + missingNames.push(resourceName); + for (const watcher of subscriptionEntry.watchers) { + watcher.onResourceDoesNotExist(); + } + } + } + return missingNames; + } else { + return []; + } + } + + /** + * Apply the validation rules for this resource type to this resource + * instance. + * This function is public so that the LDS validateResponse can call into + * the RDS validateResponse. + * @param resource The resource object sent by the xDS server + * @param isV2 If true, the resource is an xDS V2 resource instead of xDS V3 + */ + public abstract validateResponse(resource: ResponseType, isV2: boolean): boolean; + /** + * Get the name of a resource object. The name is some field of the object, so + * getting it depends on the specific type. + * @param resource + */ + protected abstract getResourceName(resource: ResponseType): string; + protected abstract getProtocolName(): string; + /** + * Indicates whether responses are "state of the world", i.e. that they + * contain all resources and that omitted previously-seen resources should + * be treated as removed. + */ + protected abstract isStateOfTheWorld(): boolean; } \ No newline at end of file From a041056e712975e8ded07d032512ee70063c8cb1 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 12 May 2022 17:19:46 -0700 Subject: [PATCH 2/5] Borrow Linux test job for xDS tests --- test/kokoro/linux.cfg | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/kokoro/linux.cfg b/test/kokoro/linux.cfg index 63f88d399..c13d03f35 100644 --- a/test/kokoro/linux.cfg +++ b/test/kokoro/linux.cfg @@ -14,10 +14,10 @@ # Config file for Kokoro (in protobuf text format) # Location of the continuous shell script in repository. -build_file: "grpc-node/test/kokoro.sh" -timeout_mins: 60 +build_file: "grpc-node/packages/grpc-js-xds/scripts/xds.sh" +timeout_mins: 360 action { define_artifacts { - regex: "github/grpc-node/reports/**/sponge_log.xml" + regex: "github/grpc/reports/**" } -} \ No newline at end of file +} From 65075e50a742c04d4544252f55a2f24471e45e31 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 12 May 2022 17:34:25 -0700 Subject: [PATCH 3/5] Only start the timer if the ADS stream is running --- packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 2ff5130c4..1d96728a2 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -113,7 +113,9 @@ export abstract class BaseXdsStreamState implements XdsStreamState cachedResponse: null, resourceTimer: setTimeout(() => {}, 0) }; - this.startResourceTimer(subscriptionEntry); + if (this.isAdsStreamRunning) { + this.startResourceTimer(subscriptionEntry); + } this.subscriptions.set(name, subscriptionEntry); } subscriptionEntry.watchers.push(watcher); From 9035327af12c989933946047f7125e4d636b6611 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 12 May 2022 17:38:28 -0700 Subject: [PATCH 4/5] Clear the nonce when the stream ends --- packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts index 1d96728a2..0b806f843 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts @@ -201,6 +201,7 @@ export abstract class BaseXdsStreamState implements XdsStreamState clearTimeout(subscriptionEntry.resourceTimer); } this.isAdsStreamRunning = false; + this.nonce = ''; } reportAdsStreamStart() { From 9502f5265de4c1ebc5aeb4bd4eb8c9c6ca829ded Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 13 May 2022 09:48:36 -0700 Subject: [PATCH 5/5] Revert "Borrow Linux test job for xDS tests" This reverts commit a041056e712975e8ded07d032512ee70063c8cb1. --- test/kokoro/linux.cfg | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/kokoro/linux.cfg b/test/kokoro/linux.cfg index c13d03f35..63f88d399 100644 --- a/test/kokoro/linux.cfg +++ b/test/kokoro/linux.cfg @@ -14,10 +14,10 @@ # Config file for Kokoro (in protobuf text format) # Location of the continuous shell script in repository. -build_file: "grpc-node/packages/grpc-js-xds/scripts/xds.sh" -timeout_mins: 360 +build_file: "grpc-node/test/kokoro.sh" +timeout_mins: 60 action { define_artifacts { - regex: "github/grpc/reports/**" + regex: "github/grpc-node/reports/**/sponge_log.xml" } -} +} \ No newline at end of file