Skip to content

Commit

Permalink
Merge pull request #2409 from murgatroid99/v1.8.x_merge
Browse files Browse the repository at this point in the history
Merge v1.8.x into master
  • Loading branch information
murgatroid99 committed Apr 5, 2023
2 parents 167732a + 5942317 commit 90de58c
Show file tree
Hide file tree
Showing 24 changed files with 1,080 additions and 721 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/package.json
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.8.0",
"version": "1.8.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
Expand Down
2 changes: 2 additions & 0 deletions packages/grpc-js-xds/src/xds-client.ts
Expand Up @@ -342,11 +342,13 @@ export class XdsClient {
this.adsNode = {
...bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
setCsdsClientNode(this.adsNode);
Expand Down
7 changes: 3 additions & 4 deletions packages/grpc-js-xds/src/xds-stream-state/rds-state.ts
Expand Up @@ -32,6 +32,8 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];

const UINT32_MAX = 0xFFFFFFFF;

function durationToMs(duration: Duration__Output | null): number | null {
if (duration === null) {
return null;
Expand Down Expand Up @@ -130,14 +132,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
if (route.route.weighted_clusters!.total_weight?.value === 0) {
return false;
}
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
if (weightSum === 0 || weightSum > UINT32_MAX) {
return false;
}
if (EXPERIMENTAL_FAULT_INJECTION) {
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/README.md
Expand Up @@ -62,6 +62,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.enable_retries`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc.service_config_disable_resolution`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.0",
"version": "1.8.13",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
2 changes: 2 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Expand Up @@ -55,6 +55,7 @@ export interface ChannelOptions {
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
'grpc.service_config_disable_resolution'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
Expand Down Expand Up @@ -87,6 +88,7 @@ export const recognizedOptions = {
'grpc.max_connection_age_ms': true,
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
'grpc.service_config_disable_resolution': true,
};

export function channelOptionsEqual(
Expand Down
39 changes: 38 additions & 1 deletion packages/grpc-js/src/deadline.ts
Expand Up @@ -51,8 +51,45 @@ export function getDeadlineTimeoutString(deadline: Deadline) {
throw new Error('Deadline is too far in the future')
}

/**
* See https://nodejs.org/api/timers.html#settimeoutcallback-delay-args
* In particular, "When delay is larger than 2147483647 or less than 1, the
* delay will be set to 1. Non-integer delays are truncated to an integer."
* This number of milliseconds is almost 25 days.
*/
const MAX_TIMEOUT_TIME = 2147483647;

/**
* Get the timeout value that should be passed to setTimeout now for the timer
* to end at the deadline. For any deadline before now, the timer should end
* immediately, represented by a value of 0. For any deadline more than
* MAX_TIMEOUT_TIME milliseconds in the future, a timer cannot be set that will
* end at that time, so it is treated as infinitely far in the future.
* @param deadline
* @returns
*/
export function getRelativeTimeout(deadline: Deadline) {
const deadlineMs = deadline instanceof Date ? deadline.getTime() : deadline;
const now = new Date().getTime();
return deadlineMs - now;
const timeout = deadlineMs - now;
if (timeout < 0) {
return 0;
} else if (timeout > MAX_TIMEOUT_TIME) {
return Infinity
} else {
return timeout;
}
}

export function deadlineToString(deadline: Deadline): string {
if (deadline instanceof Date) {
return deadline.toISOString();
} else {
const dateDeadline = new Date(deadline);
if (Number.isNaN(dateDeadline.getTime())) {
return '' + deadline;
} else {
return dateDeadline.toISOString();
}
}
}
3 changes: 2 additions & 1 deletion packages/grpc-js/src/index.ts
Expand Up @@ -237,7 +237,7 @@ export const getClientChannel = (client: Client) => {

export { StatusBuilder };

export { Listener } from './call-interface';
export { Listener, InterceptingListener } from './call-interface';

export {
Requester,
Expand All @@ -248,6 +248,7 @@ export {
InterceptorProvider,
InterceptingCall,
InterceptorConfigurationError,
NextCall
} from './client-interceptors';

export {
Expand Down
56 changes: 51 additions & 5 deletions packages/grpc-js/src/internal-channel.ts
Expand Up @@ -46,11 +46,12 @@ import { LoadBalancingCall } from './load-balancing-call';
import { CallCredentials } from './call-credentials';
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface';
import { SubchannelCall } from './subchannel-call';
import { Deadline, getDeadlineTimeoutString } from './deadline';
import { Deadline, deadlineToString, getDeadlineTimeoutString } from './deadline';
import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';

/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
Expand Down Expand Up @@ -84,6 +85,32 @@ const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB

class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private refCount = 0;
private subchannelStateListener: ConnectivityStateListener;
constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
super(childSubchannel);
this.subchannelStateListener = (subchannel, previousState, newState, keepaliveTime) => {
channel.throttleKeepalive(keepaliveTime);
};
childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
}

ref(): void {
this.child.ref();
this.refCount += 1;
}

unref(): void {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
this.child.removeConnectivityStateListener(this.subchannelStateListener);
this.channel.removeWrappedSubchannel(this);
}
}
}

export class InternalChannel {

private resolvingLoadBalancer: ResolvingLoadBalancer;
Expand Down Expand Up @@ -116,8 +143,10 @@ export class InternalChannel {
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();

// Channelz info
private readonly channelzEnabled: boolean = true;
Expand Down Expand Up @@ -190,6 +219,7 @@ export class InternalChannel {
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand All @@ -201,10 +231,13 @@ export class InternalChannel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
Expand Down Expand Up @@ -369,6 +402,19 @@ export class InternalChannel {
}
}

throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
for (const wrappedSubchannel of this.wrappedSubchannels) {
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
}
}
}

removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}

doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
}
Expand Down Expand Up @@ -469,7 +515,7 @@ export class InternalChannel {
'] method="' +
method +
'", deadline=' +
deadline
deadlineToString(deadline)
);
const finalOptions: CallStreamOptions = {
deadline: deadline,
Expand Down
8 changes: 4 additions & 4 deletions packages/grpc-js/src/load-balancer-outlier-detection.ts
Expand Up @@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
listener(this, previousState, newState, keepaliveTime);
}
}
});
Expand Down Expand Up @@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1);
}
}

uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
}

Expand Down
6 changes: 4 additions & 2 deletions packages/grpc-js/src/load-balancer-pick-first.ts
Expand Up @@ -33,6 +33,7 @@ import {
} from './picker';
import {
SubchannelAddress,
subchannelAddressEqual,
subchannelAddressToString,
} from './subchannel-address';
import * as logging from './logging';
Expand Down Expand Up @@ -168,7 +169,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* connecting to the next one instead of waiting for the connection
* delay timer. */
if (
subchannel === this.subchannels[this.currentSubchannelIndex] &&
subchannel.getRealSubchannel() === this.subchannels[this.currentSubchannelIndex].getRealSubchannel() &&
newState === ConnectivityState.TRANSIENT_FAILURE
) {
this.startNextSubchannelConnecting();
Expand Down Expand Up @@ -419,8 +420,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* address list is different from the existing one */
if (
this.subchannels.length === 0 ||
this.latestAddressList.length !== addressList.length ||
!this.latestAddressList.every(
(value, index) => addressList[index] === value
(value, index) => addressList[index] && subchannelAddressEqual(addressList[index], value)
)
) {
this.latestAddressList = addressList;
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/load-balancing-call.ts
Expand Up @@ -102,6 +102,7 @@ export class LoadBalancingCall implements Call {
if (!this.metadata) {
throw new Error('doPick called before start');
}
this.trace('Pick called')
const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
const subchannelString = pickResult.subchannel ?
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
Expand Down
7 changes: 6 additions & 1 deletion packages/grpc-js/src/resolver-dns.ts
Expand Up @@ -98,6 +98,7 @@ class DnsResolver implements Resolver {
private continueResolving = false;
private nextResolutionTimer: NodeJS.Timer;
private isNextResolutionTimerRunning = false;
private isServiceConfigEnabled = true;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
Expand Down Expand Up @@ -127,6 +128,10 @@ class DnsResolver implements Resolver {
}
this.percentage = Math.random() * 100;

if (channelOptions['grpc.service_config_disable_resolution'] === 1) {
this.isServiceConfigEnabled = false;
}

this.defaultResolutionError = {
code: Status.UNAVAILABLE,
details: `Name resolution failed for target ${uriToString(this.target)}`,
Expand Down Expand Up @@ -255,7 +260,7 @@ class DnsResolver implements Resolver {
);
/* If there already is a still-pending TXT resolution, we can just use
* that result when it comes in */
if (this.pendingTxtPromise === null) {
if (this.isServiceConfigEnabled && this.pendingTxtPromise === null) {
/* We handle the TXT query promise differently than the others because
* the name resolution attempt as a whole is a success even if the TXT
* lookup fails */
Expand Down

0 comments on commit 90de58c

Please sign in to comment.