Skip to content

Commit

Permalink
Merge pull request #2561 from murgatroid99/grpc-js_pick_first_leaf
Browse files Browse the repository at this point in the history
grpc-js: Make pick_first the universal leaf policy, plus related changes
  • Loading branch information
murgatroid99 committed Aug 31, 2023
2 parents 8d532f9 + 266af4c commit 092d1e9
Show file tree
Hide file tree
Showing 28 changed files with 816 additions and 593 deletions.
6 changes: 3 additions & 3 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Expand Up @@ -34,7 +34,7 @@ import TypedLoadBalancingConfig = grpc.experimental.TypedLoadBalancingConfig;
import LoadBalancer = grpc.experimental.LoadBalancer;
import ChannelControlHelper = grpc.experimental.ChannelControlHelper;
import ChildLoadBalancerHandler = grpc.experimental.ChildLoadBalancerHandler;
import SubchannelAddress = grpc.experimental.SubchannelAddress;
import Endpoint = grpc.experimental.Endpoint;
import Picker = grpc.experimental.Picker;
import PickArgs = grpc.experimental.PickArgs;
import PickResult = grpc.experimental.PickResult;
Expand Down Expand Up @@ -99,12 +99,12 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
}
exitIdle(): void {
this.child.exitIdle();
Expand Down
11 changes: 2 additions & 9 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Expand Up @@ -18,23 +18,16 @@
import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js';
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
import Endpoint = experimental.Endpoint;
import UnavailablePicker = experimental.UnavailablePicker;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import QueuePicker = experimental.QueuePicker;
import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type';

const TRACER_NAME = 'cds_balancer';
Expand Down Expand Up @@ -258,7 +251,7 @@ export class CdsLoadBalancer implements LoadBalancer {
}

updateAddressList(
addressList: SubchannelAddress[],
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
Expand Down
44 changes: 22 additions & 22 deletions packages/grpc-js-xds/src/load-balancer-priority.ts
Expand Up @@ -19,8 +19,8 @@ import { connectivityState as ConnectivityState, status as Status, Metadata, log
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import SubchannelAddress = experimental.SubchannelAddress;
import subchannelAddressToString = experimental.subchannelAddressToString;
import Endpoint = experimental.Endpoint;
import endpointToString = experimental.endpointToString;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import Picker = experimental.Picker;
import QueuePicker = experimental.QueuePicker;
Expand All @@ -40,16 +40,16 @@ const TYPE_NAME = 'priority';
const DEFAULT_FAILOVER_TIME_MS = 10_000;
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;

export type LocalitySubchannelAddress = SubchannelAddress & {
export interface LocalityEndpoint extends Endpoint {
localityPath: string[];
locality: Locality__Output;
weight: number;
};

export function isLocalitySubchannelAddress(
address: SubchannelAddress
): address is LocalitySubchannelAddress {
return Array.isArray((address as LocalitySubchannelAddress).localityPath);
export function isLocalityEndpoint(
address: Endpoint
): address is LocalityEndpoint {
return Array.isArray((address as LocalityEndpoint).localityPath);
}

/**
Expand Down Expand Up @@ -138,7 +138,7 @@ class PriorityLoadBalancingConfig implements TypedLoadBalancingConfig {

interface PriorityChildBalancer {
updateAddressList(
addressList: SubchannelAddress[],
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void;
Expand All @@ -154,7 +154,7 @@ interface PriorityChildBalancer {
}

interface UpdateArgs {
subchannelAddress: SubchannelAddress[];
subchannelAddress: Endpoint[];
lbConfig: TypedLoadBalancingConfig;
ignoreReresolutionRequests: boolean;
}
Expand Down Expand Up @@ -218,11 +218,11 @@ export class PriorityLoadBalancer implements LoadBalancer {
}

updateAddressList(
addressList: SubchannelAddress[],
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
this.childBalancer.updateAddressList(addressList, lbConfig, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig, attributes);
}

exitIdle() {
Expand Down Expand Up @@ -412,7 +412,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
}

updateAddressList(
addressList: SubchannelAddress[],
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
Expand All @@ -425,23 +425,23 @@ export class PriorityLoadBalancer implements LoadBalancer {
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap: Map<string, LocalitySubchannelAddress[]> = new Map<
const childAddressMap: Map<string, LocalityEndpoint[]> = new Map<
string,
LocalitySubchannelAddress[]
LocalityEndpoint[]
>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
for (const endpoint of endpointList) {
if (!isLocalityEndpoint(endpoint)) {
// Reject address that cannot be prioritized
return;
}
if (address.localityPath.length < 1) {
if (endpoint.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
}
const childName = address.localityPath[0];
const childAddress: LocalitySubchannelAddress = {
...address,
localityPath: address.localityPath.slice(1),
const childName = endpoint.localityPath[0];
const childAddress: LocalityEndpoint = {
...endpoint,
localityPath: endpoint.localityPath.slice(1),
};
let childAddressList = childAddressMap.get(childName);
if (childAddressList === undefined) {
Expand All @@ -458,7 +458,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* update all existing children with their new configs */
for (const [childName, childConfig] of lbConfig.getChildren()) {
const childAddresses = childAddressMap.get(childName) ?? [];
trace('Assigning child ' + childName + ' address list ' + childAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'))
trace('Assigning child ' + childName + ' endpoint list ' + childAddresses.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'))
this.latestUpdates.set(childName, {
subchannelAddress: childAddresses,
lbConfig: childConfig.config,
Expand Down
30 changes: 15 additions & 15 deletions packages/grpc-js-xds/src/load-balancer-weighted-target.ts
Expand Up @@ -16,7 +16,7 @@
*/

import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from "@grpc/grpc-js";
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
import { isLocalityEndpoint, LocalityEndpoint } from "./load-balancer-priority";
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
Expand All @@ -27,8 +27,8 @@ import PickResult = experimental.PickResult;
import PickArgs = experimental.PickArgs;
import QueuePicker = experimental.QueuePicker;
import UnavailablePicker = experimental.UnavailablePicker;
import SubchannelAddress = experimental.SubchannelAddress;
import subchannelAddressToString = experimental.subchannelAddressToString;
import Endpoint = experimental.Endpoint;
import endpointToString = experimental.endpointToString;
import selectLbConfigFromList = experimental.selectLbConfigFromList;

const TRACER_NAME = 'weighted_target';
Expand Down Expand Up @@ -154,7 +154,7 @@ class WeightedTargetPicker implements Picker {
}

interface WeightedChild {
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
Expand Down Expand Up @@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}

updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(addressList, lbConfig.child_policy, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down Expand Up @@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker);
}

updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand All @@ -330,9 +330,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap = new Map<string, LocalitySubchannelAddress[]>();
const childEndpointMap = new Map<string, LocalityEndpoint[]>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
if (!isLocalityEndpoint(address)) {
// Reject address that cannot be associated with targets
return;
}
Expand All @@ -341,14 +341,14 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
return;
}
const childName = address.localityPath[0];
const childAddress: LocalitySubchannelAddress = {
const childAddress: LocalityEndpoint = {
...address,
localityPath: address.localityPath.slice(1),
};
let childAddressList = childAddressMap.get(childName);
let childAddressList = childEndpointMap.get(childName);
if (childAddressList === undefined) {
childAddressList = [];
childAddressMap.set(childName, childAddressList);
childEndpointMap.set(childName, childAddressList);
}
childAddressList.push(childAddress);
}
Expand All @@ -363,9 +363,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
} else {
target.maybeReactivate();
}
const targetAddresses = childAddressMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetAddresses.map(address => '(' + subchannelAddressToString(address) + ' path=' + address.localityPath + ')'));
target.updateAddressList(targetAddresses, targetConfig, attributes);
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, attributes);
}

// Deactivate targets that are not in the new config
Expand Down
27 changes: 20 additions & 7 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts
Expand Up @@ -18,11 +18,13 @@
import { experimental, logVerbosity, status as Status, Metadata, connectivityState } from "@grpc/grpc-js";
import { validateXdsServerConfig, XdsServerConfig } from "./xds-bootstrap";
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats, XdsClusterLocalityStats } from "./xds-client";
import { LocalitySubchannelAddress } from "./load-balancer-priority";
import { LocalityEndpoint } from "./load-balancer-priority";

import LoadBalancer = experimental.LoadBalancer;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import SubchannelAddress = experimental.SubchannelAddress;
import Endpoint = experimental.Endpoint;
import endpointHasAddress = experimental.endpointHasAddress;
import subchannelAddressToString = experimental.subchannelAddressToString;
import Picker = experimental.Picker;
import PickArgs = experimental.PickArgs;
import PickResult = experimental.PickResult;
Expand All @@ -34,6 +36,7 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import SubchannelInterface = experimental.SubchannelInterface;
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";

const TRACER_NAME = 'xds_cluster_impl';

Expand Down Expand Up @@ -245,18 +248,28 @@ function getCallCounterMapKey(cluster: string, edsServiceName?: string): string

class XdsClusterImplBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private lastestEndpointList: Endpoint[] | null = null;
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig) {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
}
const locality = (subchannelAddress as LocalitySubchannelAddress).locality ?? '';
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
let locality: Locality__Output | null = null;
for (const endpoint of this.lastestEndpointList) {
if (endpointHasAddress(endpoint, subchannelAddress)) {
locality = (endpoint as LocalityEndpoint).locality;
}
}
if (locality === null) {
trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.');
return wrapperChild;
}
const lrsServer = this.latestConfig.getLrsLoadReportingServer();
let statsObj: XdsClusterLocalityStats | null = null;
if (lrsServer) {
Expand All @@ -279,15 +292,15 @@ class XdsClusterImplBalancer implements LoadBalancer {
}
}));
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.xdsClient = attributes.xdsClient as XdsClient;

if (lbConfig.getLrsLoadReportingServer()) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.getLrsLoadReportingServer()!,
Expand All @@ -296,7 +309,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}

this.childBalancer.updateAddressList(addressList, lbConfig.getChildPolicy(), attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down

0 comments on commit 092d1e9

Please sign in to comment.