Skip to content

Commit

Permalink
Merge pull request #2117 from murgatroid99/grpc-js-xds_resource_timer
Browse files Browse the repository at this point in the history
grpc-js-xds: Refactor xDS stream state and add resource timer
  • Loading branch information
murgatroid99 committed Aug 8, 2022
2 parents 304a2b7 + 6ab1abc commit 9e34f3c
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 506 deletions.
10 changes: 9 additions & 1 deletion packages/grpc-js-xds/src/xds-client.ts
Expand Up @@ -309,7 +309,7 @@ export class XdsClient {
const edsState = new EdsState(() => {
this.updateNames('eds');
});
const cdsState = new CdsState(edsState, () => {
const cdsState = new CdsState(() => {
this.updateNames('cds');
});
const rdsState = new RdsState(() => {
Expand Down Expand Up @@ -630,6 +630,7 @@ export class XdsClient {
this.updateNames(service);
}
}
this.reportAdsStreamStarted();
}
}

Expand Down Expand Up @@ -778,6 +779,13 @@ export class XdsClient {
this.adsState.lds.reportStreamError(status);
}

private reportAdsStreamStarted() {
this.adsState.eds.reportAdsStreamStart();
this.adsState.cds.reportAdsStreamStart();
this.adsState.rds.reportAdsStreamStart();
this.adsState.lds.reportAdsStreamStart();
}

private handleLrsResponse(message: LoadStatsResponse__Output) {
trace('Received LRS response');
/* Once we get any response from the server, we assume that the stream is
Expand Down
156 changes: 9 additions & 147 deletions packages/grpc-js-xds/src/xds-stream-state/cds-state.ts
Expand Up @@ -15,94 +15,21 @@
*
*/

import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { Any__Output } from "../generated/google/protobuf/Any";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { EdsState } from "./eds-state";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";

const TRACER_NAME = 'xds_client';

function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}

export class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';

private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();

private latestResponses: Cluster__Output[] = [];
private latestIsV2 = false;

constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}

/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);

/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class CdsState extends BaseXdsStreamState<Cluster__Output> implements XdsStreamState<Cluster__Output> {
protected isStateOfTheWorld(): boolean {
return true;
}

removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getResourceName(resource: Cluster__Output): string {
return resource.name;
}

getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected getProtocolName(): string {
return 'CDS';
}

private validateNonnegativeDuration(duration: Duration__Output | null): boolean {
Expand All @@ -125,7 +52,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
return percentage.value >=0 && percentage.value <= 100;
}

private validateResponse(message: Cluster__Output): boolean {
public validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
Expand Down Expand Up @@ -167,69 +94,4 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
return true;
}

/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>): string[] {
const missingNames: string[] = [];
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
missingNames.push(clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
return missingNames;
}

handleResponses(responses: ResourcePair<Cluster__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: Cluster__Output[] = [];
const result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.name,
raw: raw});
} else {
trace('CDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.name,
raw: raw,
error: `Cluster validation failed for resource ${resource.name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
result.missing = this.handleMissingNames(allClusterNames);
return result;
}

reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
130 changes: 9 additions & 121 deletions packages/grpc-js-xds/src/xds-stream-state/eds-state.ts
Expand Up @@ -20,7 +20,7 @@ import { isIPv4, isIPv6 } from "net";
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Any__Output } from "../generated/google/protobuf/Any";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";

const TRACER_NAME = 'xds_client';

Expand All @@ -32,91 +32,23 @@ function localitiesEqual(a: Locality__Output, b: Locality__Output) {
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
}

export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';

private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();

private latestResponses: ClusterLoadAssignment__Output[] = [];
private latestIsV2 = false;

constructor(private updateResourceNames: () => void) {}

/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);

/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
}

removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getProtocolName(): string {
return 'EDS';
}

getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected isStateOfTheWorld(): boolean {
return false;
}

/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
public validateResponse(message: ClusterLoadAssignment__Output) {
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
Expand All @@ -143,48 +75,4 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
return true;
}

handleResponses(responses: ResourcePair<ClusterLoadAssignment__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: ClusterLoadAssignment__Output[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.cluster_name,
raw: raw});
} else {
trace('EDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.cluster_name,
raw: raw,
error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
return result;
}

reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}

0 comments on commit 9e34f3c

Please sign in to comment.