Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Refactor xDS stream state and add resource timer #2117

Merged
merged 6 commits into from Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/grpc-js-xds/src/xds-client.ts
Expand Up @@ -309,7 +309,7 @@ export class XdsClient {
const edsState = new EdsState(() => {
this.updateNames('eds');
});
const cdsState = new CdsState(edsState, () => {
const cdsState = new CdsState(() => {
this.updateNames('cds');
});
const rdsState = new RdsState(() => {
Expand Down Expand Up @@ -630,6 +630,7 @@ export class XdsClient {
this.updateNames(service);
}
}
this.reportAdsStreamStarted();
}
}

Expand Down Expand Up @@ -778,6 +779,13 @@ export class XdsClient {
this.adsState.lds.reportStreamError(status);
}

private reportAdsStreamStarted() {
this.adsState.eds.reportAdsStreamStart();
this.adsState.cds.reportAdsStreamStart();
this.adsState.rds.reportAdsStreamStart();
this.adsState.lds.reportAdsStreamStart();
}

private handleLrsResponse(message: LoadStatsResponse__Output) {
trace('Received LRS response');
/* Once we get any response from the server, we assume that the stream is
Expand Down
156 changes: 9 additions & 147 deletions packages/grpc-js-xds/src/xds-stream-state/cds-state.ts
Expand Up @@ -15,94 +15,21 @@
*
*/

import { experimental, logVerbosity, StatusObject } from "@grpc/grpc-js";
import { EXPERIMENTAL_OUTLIER_DETECTION } from "../environment";
import { Cluster__Output } from "../generated/envoy/config/cluster/v3/Cluster";
import { Any__Output } from "../generated/google/protobuf/Any";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { UInt32Value__Output } from "../generated/google/protobuf/UInt32Value";
import { EdsState } from "./eds-state";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";

const TRACER_NAME = 'xds_client';

function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}

export class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';

private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();

private latestResponses: Cluster__Output[] = [];
private latestIsV2 = false;

constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}

/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Adding CDS watcher for clusterName ' + clusterName);
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);

/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing CDS update for new watcher for clusterName ' + clusterName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class CdsState extends BaseXdsStreamState<Cluster__Output> implements XdsStreamState<Cluster__Output> {
protected isStateOfTheWorld(): boolean {
return true;
}

removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
trace('Removing CDS watcher for clusterName ' + clusterName);
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getResourceName(resource: Cluster__Output): string {
return resource.name;
}

getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected getProtocolName(): string {
return 'CDS';
}

private validateNonnegativeDuration(duration: Duration__Output | null): boolean {
Expand All @@ -125,7 +52,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
return percentage.value >=0 && percentage.value <= 100;
}

private validateResponse(message: Cluster__Output): boolean {
public validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
Expand Down Expand Up @@ -167,69 +94,4 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
return true;
}

/**
* Given a list of clusterNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>): string[] {
const missingNames: string[] = [];
for (const [clusterName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(clusterName)) {
trace('Reporting CDS resource does not exist for clusterName ' + clusterName);
missingNames.push(clusterName);
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
return missingNames;
}

handleResponses(responses: ResourcePair<Cluster__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: Cluster__Output[] = [];
const result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.name,
raw: raw});
} else {
trace('CDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.name,
raw: raw,
error: `Cluster validation failed for resource ${resource.name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
result.missing = this.handleMissingNames(allClusterNames);
return result;
}

reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
130 changes: 9 additions & 121 deletions packages/grpc-js-xds/src/xds-stream-state/eds-state.ts
Expand Up @@ -20,7 +20,7 @@ import { isIPv4, isIPv6 } from "net";
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality";
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment";
import { Any__Output } from "../generated/google/protobuf/Any";
import { HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";
import { BaseXdsStreamState, HandleResponseResult, RejectedResourceEntry, ResourcePair, Watcher, XdsStreamState } from "./xds-stream-state";

const TRACER_NAME = 'xds_client';

Expand All @@ -32,91 +32,23 @@ function localitiesEqual(a: Locality__Output, b: Locality__Output) {
return a.region === b.region && a.sub_zone === b.sub_zone && a.zone === b.zone;
}

export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';

private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();

private latestResponses: ClusterLoadAssignment__Output[] = [];
private latestIsV2 = false;

constructor(private updateResourceNames: () => void) {}

/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
trace('Adding EDS watcher (' + watchersEntry.length + ' ->' + (watchersEntry.length + 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.push(watcher);

/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
const isV2 = this.latestIsV2;
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
trace('Reporting existing EDS update for new watcher for edsServiceName ' + edsServiceName);
watcher.onValidUpdate(message, isV2);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output> implements XdsStreamState<ClusterLoadAssignment__Output> {
protected getResourceName(resource: ClusterLoadAssignment__Output): string {
return resource.cluster_name;
}

removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
trace('Removing EDS watcher for edsServiceName ' + edsServiceName);
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
trace('Removed EDS watcher (' + watchersEntry.length + ' -> ' + (watchersEntry.length - 1) + ') for edsServiceName ' + edsServiceName);
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
protected getProtocolName(): string {
return 'EDS';
}

getResourceNames(): string[] {
return Array.from(this.watchers.keys());
protected isStateOfTheWorld(): boolean {
return false;
}

/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
public validateResponse(message: ClusterLoadAssignment__Output) {
const seenLocalities: {locality: Locality__Output, priority: number}[] = [];
for (const endpoint of message.endpoints) {
if (!endpoint.locality) {
Expand All @@ -143,48 +75,4 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
return true;
}

handleResponses(responses: ResourcePair<ClusterLoadAssignment__Output>[], isV2: boolean): HandleResponseResult {
const validResponses: ClusterLoadAssignment__Output[] = [];
let result: HandleResponseResult = {
accepted: [],
rejected: [],
missing: []
}
for (const {resource, raw} of responses) {
if (this.validateResponse(resource)) {
validResponses.push(resource);
result.accepted.push({
name: resource.cluster_name,
raw: raw});
} else {
trace('EDS validation failed for message ' + JSON.stringify(resource));
result.rejected.push({
name: resource.cluster_name,
raw: raw,
error: `ClusterLoadAssignment validation failed for resource ${resource.cluster_name}`
});
}
}
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of validResponses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
return result;
}

reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}