Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Add XdsClusterManager LB policy #1687

Merged
merged 2 commits into from Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/grpc-js-xds/src/index.ts
Expand Up @@ -21,6 +21,7 @@ import * as load_balancer_eds from './load-balancer-eds';
import * as load_balancer_lrs from './load-balancer-lrs';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_xds_cluster_manager from './load-balancer-xds-cluster-manager';

/**
* Register the "xds:" name scheme with the @grpc/grpc-js library.
Expand All @@ -32,4 +33,5 @@ export function register() {
load_balancer_lrs.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
load_balancer_xds_cluster_manager.setup();
}
295 changes: 295 additions & 0 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts
@@ -0,0 +1,295 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { connectivityState as ConnectivityState, status as Status, experimental, logVerbosity, Metadata, status } from "@grpc/grpc-js/";

import LoadBalancingConfig = experimental.LoadBalancingConfig;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer;
import Picker = experimental.Picker;
import PickResult = experimental.PickResult;
import PickArgs = experimental.PickArgs;
import PickResultType = experimental.PickResultType;
import UnavailablePicker = experimental.UnavailablePicker;
import QueuePicker = experimental.QueuePicker;
import SubchannelAddress = experimental.SubchannelAddress;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import getFirstUsableConfig = experimental.getFirstUsableConfig;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;

const TRACER_NAME = 'weighted_target';

function trace(text: string): void {
experimental.trace(logVerbosity.DEBUG, TRACER_NAME, text);
}

const TYPE_NAME = 'xds_cluster_manager';

interface ClusterManagerChild {
child_policy: LoadBalancingConfig[];
}

class XdsClusterManagerLoadBalancingConfig implements LoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}

constructor(private children: Map<string, ClusterManagerChild>) {}

getChildren() {
return this.children;
}

toJsonObject(): object {
const childrenField: {[key: string]: object} = {};
for (const [childName, childValue] of this.children.entries()) {
childrenField[childName] = {
child_policy: childValue.child_policy.map(policy => policy.toJsonObject())
};
}
return {
[TYPE_NAME]: {
children: childrenField
}
}
}

static createFromJson(obj: any): XdsClusterManagerLoadBalancingConfig {
const childrenMap: Map<string, ClusterManagerChild> = new Map<string, ClusterManagerChild>();
if (!('children' in obj && obj.children !== null && typeof obj.children === 'object')) {
throw new Error('xds_cluster_manager config must have a children map');
}
for (const key of obj.children) {
const childObj = obj.children[key];
if (!('child_policy' in childObj && Array.isArray(childObj.child_policy))) {
throw new Error(`xds_cluster_manager child ${key} must have a child_policy array`);
}
const validatedChild = {
child_policy: childObj.child_policy.map(validateLoadBalancingConfig)
};
childrenMap.set(key, validatedChild);
}
return new XdsClusterManagerLoadBalancingConfig(childrenMap);
}
}

class XdsClusterManagerPicker implements Picker {
constructor(private childPickers: Map<string, Picker>) {}

pick(pickArgs: PickArgs): PickResult {
/* extraPickInfo.cluster should be set for all calls by the config selector
* corresponding to the service config that specified the use of this LB
* policy. */
const cluster = pickArgs.extraPickInfo.cluster ?? '';
if (this.childPickers.has(cluster)) {
return this.childPickers.get(cluster)!.pick(pickArgs);
} else {
return {
pickResultType: PickResultType.TRANSIENT_FAILURE,
status: {
code: status.INTERNAL,
details: `Requested cluster ${cluster} not found`,
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null
};
}
}
}

interface XdsClusterManagerChild {
updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;

}

class XdsClusterManager implements LoadBalancer {
private XdsClusterManagerChildImpl = class implements XdsClusterManagerChild {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private picker: Picker;
private childBalancer: ChildLoadBalancerHandler;

constructor(private parent: XdsClusterManager, private name: string) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelOptions) => {
return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions);
},
updateState: (connectivityState, picker) => {
this.updateState(connectivityState, picker);
},
requestReresolution: () => {
this.parent.channelControlHelper.requestReresolution();
}
});

this.picker = new QueuePicker(this.childBalancer);
}

private updateState(connectivityState: ConnectivityState, picker: Picker) {
trace('Child ' + this.name + ' ' + ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[connectivityState]);
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.updateState();
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: ClusterManagerChild, attributes: { [key: string]: unknown; }): void {
const childConfig = getFirstUsableConfig(lbConfig.child_policy);
if (childConfig !== null) {
this.childBalancer.updateAddressList(addressList, childConfig, attributes);
}
}
exitIdle(): void {
this.childBalancer.exitIdle();
}
resetBackoff(): void {
this.childBalancer.resetBackoff();
}
destroy(): void {
this.childBalancer.destroy();
}
getConnectivityState(): ConnectivityState {
return this.connectivityState;
}
getPicker(): Picker {
return this.picker;
}
}
// End of XdsClusterManagerChildImpl

private children: Map<string, XdsClusterManagerChild> = new Map<string, XdsClusterManagerChild>();
// Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
constructor(private channelControlHelper: ChannelControlHelper) {}

private updateState() {
const pickerMap: Map<string, Picker> = new Map<string, Picker>();
let anyReady = false;
let anyConnecting = false;
let anyIdle = false;
for (const [name, child] of this.children.entries()) {
pickerMap.set(name, child.getPicker());
switch (child.getConnectivityState()) {
case ConnectivityState.READY:
anyReady = true;
break;
case ConnectivityState.CONNECTING:
anyConnecting = true;
break;
case ConnectivityState.IDLE:
anyIdle = true;
break;
}
}
let connectivityState: ConnectivityState;
if (anyReady) {
connectivityState = ConnectivityState.READY;
} else if (anyConnecting) {
connectivityState = ConnectivityState.CONNECTING;
} else if (anyIdle) {
connectivityState = ConnectivityState.IDLE;
} else {
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
}
/* For each of the states CONNECTING, IDLE, and TRANSIENT_FAILURE, there is
* exactly one corresponding picker, so if the state is one of those and
* that does not change, no new information is provided by passing the
* new state upward. */
if (connectivityState === this.currentState && connectivityState !== ConnectivityState.READY) {
return;
}
let picker: Picker;

switch (connectivityState) {
case ConnectivityState.READY:
picker = new XdsClusterManagerPicker(pickerMap);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.IDLE:
picker = new QueuePicker(this);
break;
default:
picker = new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'xds_cluster_manager: all children report state TRANSIENT_FAILURE',
metadata: new Metadata()
});
}
trace(
'Transitioning to ' +
ConnectivityState[connectivityState]
);
this.channelControlHelper.updateState(connectivityState, picker);
}

updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
const configChildren = lbConfig.getChildren();
// Delete children that are not in the new config
const namesToRemove: string[] = [];
for (const name of this.children.keys()) {
if (!configChildren.has(name)) {
namesToRemove.push(name);
}
}
for (const name of namesToRemove) {
this.children.get(name)!.destroy();
this.children.delete(name);
}
// Add new children that were not in the previous config
for (const [name, childConfig] of configChildren.entries()) {
if (!this.children.has(name)) {
const newChild = new this.XdsClusterManagerChildImpl(this, name);
newChild.updateAddressList(addressList, childConfig, attributes);
this.children.set(name, newChild);
}
}
this.updateState();
}
exitIdle(): void {
for (const child of this.children.values()) {
child.exitIdle();
}
}
resetBackoff(): void {
for (const child of this.children.values()) {
child.resetBackoff();
}
}
destroy(): void {
for (const child of this.children.values()) {
child.destroy();
}
this.children.clear();
}
getTypeName(): string {
return TYPE_NAME;
}
}

export function setup() {
registerLoadBalancerType(TYPE_NAME, XdsClusterManager, XdsClusterManagerLoadBalancingConfig);
}