From c3c39af8ac324c73fd46b44596b1fc3e41199141 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Feb 2021 12:10:04 -0800 Subject: [PATCH 1/2] grpc-js-xds: Add XdsClusterManager LB policy --- packages/grpc-js-xds/src/index.ts | 2 + .../src/load-balancer-xds-cluster-manager.ts | 286 ++++++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts diff --git a/packages/grpc-js-xds/src/index.ts b/packages/grpc-js-xds/src/index.ts index 06bea9904..1b24d25e6 100644 --- a/packages/grpc-js-xds/src/index.ts +++ b/packages/grpc-js-xds/src/index.ts @@ -21,6 +21,7 @@ import * as load_balancer_eds from './load-balancer-eds'; import * as load_balancer_lrs from './load-balancer-lrs'; import * as load_balancer_priority from './load-balancer-priority'; import * as load_balancer_weighted_target from './load-balancer-weighted-target'; +import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager'; /** * Register the "xds:" name scheme with the @grpc/grpc-js library. @@ -32,4 +33,5 @@ export function register() { load_balancer_lrs.setup(); load_balancer_priority.setup(); load_balancer_weighted_target.setup(); + load_balancer_xds_cluster_manager.setup(); } \ No newline at end of file diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts new file mode 100644 index 000000000..c4f53759a --- /dev/null +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -0,0 +1,286 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { connectivityState as ConnectivityState, status as Status, experimental, logVerbosity, Metadata, status } from "@grpc/grpc-js/"; + +import LoadBalancingConfig = experimental.LoadBalancingConfig; +import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig; +import LoadBalancer = experimental.LoadBalancer; +import Picker = experimental.Picker; +import PickResult = experimental.PickResult; +import PickArgs = experimental.PickArgs; +import PickResultType = experimental.PickResultType; +import UnavailablePicker = experimental.UnavailablePicker; +import QueuePicker = experimental.QueuePicker; +import SubchannelAddress = experimental.SubchannelAddress; +import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler; +import getFirstUsableConfig = experimental.getFirstUsableConfig; +import ChannelControlHelper = experimental.ChannelControlHelper; +import registerLoadBalancerType = experimental.registerLoadBalancerType; + +const TRACER_NAME = 'weighted_target'; + +function trace(text: string): void { + experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text); +} + +const TYPE_NAME = 'xds_cluster_manager'; + +interface ClusterManagerChild { + child_policy: LoadBalancingConfig[]; +} + +class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig { + getLoadBalancerName(): string { + return TYPE_NAME; + } + + constructor(private children: Map) {} + + getChildren() { + return this.children; + } + + toJsonObject(): object { + const childrenField: {[key: string]: object} = {}; + for (const [childName, childValue] of this.children.entries()) { + childrenField[childName] = { + child_policy: childValue.child_policy.map(policy => policy.toJsonObject()) + }; + } + return { + [TYPE_NAME]: { + children: childrenField + } + } + } + + static createFromJson(obj: any): XdsClusterManagerLoadBalancingConfig { + const childrenMap: Map = new Map(); + if (!('children' in obj && obj.children !== null && typeof obj.children === 'object')) { + throw new Error('xds_cluster_manager config must have a children map'); + } + for (const key of obj.children) { + const childObj = obj.children[key]; + if (!('child_policy' in childObj && Array.isArray(childObj.child_policy))) { + throw new Error(`xds_cluster_manager child ${key} must have a child_policy array`); + } + const validatedChild = { + child_policy: childObj.child_policy.map(validateLoadBalancingConfig) + }; + childrenMap.set(key, validatedChild); + } + return new XdsClusterManagerLoadBalancingConfig(childrenMap); + } +} + +class XdsClusterManagerPicker implements Picker { + constructor(private childPickers: Map) {} + + pick(pickArgs: PickArgs): PickResult { + /* extraPickInfo.cluster should be set for all calls by the config selector + * corresponding to the service config that specified the use of this LB + * policy. */ + const cluster = pickArgs.extraPickInfo.cluster ?? ''; + if (this.childPickers.has(cluster)) { + return this.childPickers.get(cluster)!.pick(pickArgs); + } else { + return { + pickResultType: PickResultType.TRANSIENT_FAILURE, + status: { + code: status.INTERNAL, + details: `Requested cluster ${cluster} not found`, + metadata: new Metadata(), + }, + subchannel: null, + extraFilterFactory: null, + onCallStarted: null + }; + } + } +} + +interface XdsClusterManagerChild { + updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void; + exitIdle(): void; + resetBackoff(): void; + destroy(): void; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + +} + +class XdsClusterManager implements LoadBalancer { + private XdsClusterManagerChildImpl = class implements XdsClusterManagerChild { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + + constructor(private parent: XdsClusterManager, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: (subchannelAddress, subchannelOptions) => { + return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions); + }, + updateState: (connectivityState, picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + } + }); + + this.picker = new QueuePicker(this.childBalancer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + trace('Child ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]); + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.updateState(); + } + updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void { + const childConfig = getFirstUsableConfig(lbConfig.child_policy); + if (childConfig !== null) { + this.childBalancer.updateAddressList(addressList, childConfig, attributes); + } + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + } + getConnectivityState(): ConnectivityState { + return this.connectivityState; + } + getPicker(): Picker { + return this.picker; + } + } + // End of XdsClusterManagerChildImpl + + private children: Map = new Map(); + constructor(private channelControlHelper: ChannelControlHelper) {} + + private updateState() { + const pickerMap: Map = new Map(); + let anyReady = false; + let anyConnecting = false; + let anyIdle = false; + for (const [name, child] of this.children.entries()) { + pickerMap.set(name, child.getPicker()); + switch (child.getConnectivityState()) { + case ConnectivityState.READY: + anyReady = true; + break; + case ConnectivityState.CONNECTING: + anyConnecting = true; + break; + case ConnectivityState.IDLE: + anyIdle = true; + break; + } + } + let connectivityState: ConnectivityState; + if (anyReady) { + connectivityState = ConnectivityState.READY; + } else if (anyConnecting) { + connectivityState = ConnectivityState.CONNECTING; + } else if (anyIdle) { + connectivityState = ConnectivityState.IDLE; + } else { + connectivityState = ConnectivityState.TRANSIENT_FAILURE; + } + let picker: Picker; + + switch (connectivityState) { + case ConnectivityState.READY: + picker = new XdsClusterManagerPicker(pickerMap); + break; + case ConnectivityState.CONNECTING: + case ConnectivityState.IDLE: + picker = new QueuePicker(this); + break; + default: + picker = new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE', + metadata: new Metadata() + }); + } + trace( + 'Transitioning to ' + + ConnectivityState[connectivityState] + ); + this.channelControlHelper.updateState(connectivityState, picker); + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) { + // Reject a config of the wrong type + trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2)); + return; + } + const configChildren = lbConfig.getChildren(); + // Delete children that are not in the new config + const namesToRemove: string[] = []; + for (const name of this.children.keys()) { + if (!configChildren.has(name)) { + namesToRemove.push(name); + } + } + for (const name of namesToRemove) { + this.children.get(name)!.destroy(); + this.children.delete(name); + } + // Add new children that were not in the previous config + for (const [name, childConfig] of configChildren.entries()) { + if (!this.children.has(name)) { + const newChild = new this.XdsClusterManagerChildImpl(this, name); + newChild.updateAddressList(addressList, childConfig, attributes); + this.children.set(name, newChild); + } + } + this.updateState(); + } + exitIdle(): void { + for (const child of this.children.values()) { + child.exitIdle(); + } + } + resetBackoff(): void { + for (const child of this.children.values()) { + child.resetBackoff(); + } + } + destroy(): void { + for (const child of this.children.values()) { + child.destroy(); + } + this.children.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, XdsClusterManager, XdsClusterManagerLoadBalancingConfig); +} \ No newline at end of file From d1aa9aa6fc02ad7152a7495f43d3769b0a18dfc7 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Feb 2021 12:20:01 -0800 Subject: [PATCH 2/2] Don't update identical states with identical pickers --- .../grpc-js-xds/src/load-balancer-xds-cluster-manager.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index c4f53759a..f8d2de258 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -177,6 +177,8 @@ class XdsClusterManager implements LoadBalancer { // End of XdsClusterManagerChildImpl private children: Map = new Map(); + // Shutdown is a placeholder value that will never appear in normal operation. + private currentState: ConnectivityState = ConnectivityState.SHUTDOWN; constructor(private channelControlHelper: ChannelControlHelper) {} private updateState() { @@ -208,6 +210,13 @@ class XdsClusterManager implements LoadBalancer { } else { connectivityState = ConnectivityState.TRANSIENT_FAILURE; } + /* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is + * exactly one corresponding picker, so if the state is one of those and + * that does not change, no new information is provided by passing the + * new state upward. */ + if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) { + return; + } let picker: Picker; switch (connectivityState) {