Skip to content

Commit

Permalink
Merge pull request #1678 from grpc/@grpc/grpc-js@1.2.x
Browse files Browse the repository at this point in the history
@grpc/grpc js@1.2.x upmerge
  • Loading branch information
murgatroid99 committed Jan 29, 2021
2 parents 1427ebd + 98376f0 commit b570200
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 89 deletions.
24 changes: 19 additions & 5 deletions packages/grpc-js-xds/package.json
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.0.0",
"version": "1.2.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {
Expand Down Expand Up @@ -32,22 +32,36 @@
"homepage": "https://github.com/grpc/grpc-node#readme",
"devDependencies": {
"@grpc/grpc-js": "file:../grpc-js",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"@types/gulp": "^4.0.6",
"@types/gulp-mocha": "0.0.32",
"@types/mocha": "^5.2.6",
"@types/node": "^13.11.1",
"@types/yargs": "^15.0.5",
"gts": "^2.0.2",
"typescript": "^3.8.3",
"yargs": "^15.4.1"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.0-pre14"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.2.0"
"@grpc/grpc-js": "~1.2.2"
},
"engines": {
"node": ">=10.10.0"
}
},
"files": [
"src/**/*.ts",
"build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/api/v2/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto",
"deps/envoy-api/envoy/type/**/*.proto",
"deps/envoy-api/envoy/annotations/**/*.proto",
"deps/googleapis/google/api/**/*.proto",
"deps/googleapis/google/protobuf/**/*.proto",
"deps/googleapis/google/rpc/**/*.proto",
"deps/udpa/udpa/annotations/**/*.proto",
"deps/protoc-gen-validate/validate/**/*.proto"
]
}
21 changes: 12 additions & 9 deletions packages/grpc-js-xds/src/resolver-xds.ts
Expand Up @@ -30,58 +30,61 @@ function trace(text: string): void {
}

class XdsResolver implements Resolver {
private resolutionStarted = false;
private hasReportedSuccess = false;
private xdsClient: XdsClient | null = null;

constructor(
private target: GrpcUri,
private listener: ResolverListener,
private channelOptions: ChannelOptions
) {}

private reportResolutionError() {
private reportResolutionError(reason: string) {
this.listener.onError({
code: status.UNAVAILABLE,
details: `xDS name resolution failed for target ${uriToString(
this.target
)}`,
)}: ${reason}`,
metadata: new Metadata(),
});
}

updateResolution(): void {
// Wait until updateResolution is called once to start the xDS requests
if (!this.resolutionStarted) {
this.resolutionStarted = true;
if (this.xdsClient === null) {
trace('Starting resolution for target ' + uriToString(this.target));
const xdsClient = new XdsClient(
this.xdsClient = new XdsClient(
this.target.path,
{
onValidUpdate: (update: ServiceConfig) => {
trace('Resolved service config for target ' + uriToString(this.target) + ': ' + JSON.stringify(update));
this.hasReportedSuccess = true;
this.listener.onSuccessfulResolution([], update, null, {
xdsClient: xdsClient,
xdsClient: this.xdsClient,
});
},
onTransientError: (error: StatusObject) => {
/* A transient error only needs to bubble up as a failure if we have
* not already provided a ServiceConfig for the upper layer to use */
if (!this.hasReportedSuccess) {
trace('Resolution error for target ' + uriToString(this.target) + ' due to xDS client transient error ' + error.details);
this.reportResolutionError();
this.reportResolutionError(error.details);
}
},
onResourceDoesNotExist: () => {
trace('Resolution error for target ' + uriToString(this.target) + ': resource does not exist');
this.reportResolutionError();
this.reportResolutionError("Resource does not exist");
},
},
this.channelOptions
);
}
}

destroy() {
this.xdsClient?.shutdown();
}

static getDefaultAuthority(target: GrpcUri) {
return target.path;
}
Expand Down
6 changes: 2 additions & 4 deletions packages/grpc-js-xds/src/xds-client.ts
Expand Up @@ -1018,13 +1018,11 @@ export class XdsClient {

this.lrsBackoff.runOnce();
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('metadata', () => {
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
/* Once we get any response from the server, we assume that the stream is
* in a good state, so we can reset the backoff timer. */
this.lrsBackoff.stop();
this.lrsBackoff.reset();
});
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
Expand Down Expand Up @@ -1157,7 +1155,7 @@ export class XdsClient {
}

removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher removed for endpoint ' + clusterName);
trace('Watcher removed for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.2.0",
"version": "1.2.5",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
144 changes: 87 additions & 57 deletions packages/grpc-js/src/call-stream.ts
Expand Up @@ -37,6 +37,10 @@ const {
NGHTTP2_CANCEL,
} = http2.constants;

interface NodeError extends Error {
code: string;
}

export type Deadline = Date | number;

export interface CallStreamOptions {
Expand Down Expand Up @@ -202,6 +206,8 @@ export class Http2CallStream implements Call {

private listener: InterceptingListener | null = null;

private internalErrorMessage: string | null = null;

constructor(
private readonly methodName: string,
private readonly channel: ChannelImplementation,
Expand Down Expand Up @@ -518,66 +524,86 @@ export class Http2CallStream implements Call {
this.maybeOutputStatus();
});
stream.on('close', () => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
/* Use process.next tick to ensure that this code happens after any
* "error" event that may be emitted at about the same time, so that
* we can bubble up the error message from that event. */
process.nextTick(() => {
this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
/* If we have a final status with an OK status code, that means that
* we have received all of the messages and we have processed the
* trailers and the call completed successfully, so it doesn't matter
* how the stream ends after that */
if (this.finalStatus?.code === Status.OK) {
return;
}
let code: Status;
let details = '';
switch (stream.rstCode) {
case http2.constants.NGHTTP2_NO_ERROR:
/* If we get a NO_ERROR code and we already have a status, the
* stream completed properly and we just haven't fully processed
* it yet */
if (this.finalStatus !== null) {
return;
}
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
break;
case http2.constants.NGHTTP2_REFUSED_STREAM:
code = Status.UNAVAILABLE;
details = 'Stream refused by server';
break;
case http2.constants.NGHTTP2_CANCEL:
code = Status.CANCELLED;
details = 'Call cancelled';
break;
case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
break;
case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
if (this.internalErrorMessage === null) {
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
} else {
/* The "Received RST_STREAM with code ..." error is preserved
* here for continuity with errors reported online, but the
* error message at the end will probably be more relevant in
* most cases. */
details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalErrorMessage}`;
}
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;
}
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({ code, details, metadata: new Metadata() });
});
});
stream.on('error', (err: Error) => {
stream.on('error', (err: NodeError) => {
/* We need an error handler here to stop "Uncaught Error" exceptions
* from bubbling up. However, errors here should all correspond to
* "close" events, where we will handle the error more granularly */
/* Specifically looking for stream errors that were *not* constructed
* from a RST_STREAM response here:
* https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
*/
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
this.internalErrorMessage = err.message;
}
});
if (!this.pendingRead) {
stream.pause();
Expand Down Expand Up @@ -630,7 +656,11 @@ export class Http2CallStream implements Call {

getDeadline(): Deadline {
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
const parentDeadline = this.options.parentCall.getDeadline();
const selfDeadline = this.options.deadline;
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline;
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline;
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs);
} else {
return this.options.deadline;
}
Expand Down
6 changes: 4 additions & 2 deletions packages/grpc-js/src/client-interceptors.ts
Expand Up @@ -347,10 +347,11 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let serialized: Buffer;
try {
serialized = this.methodDefinition.requestSerialize(message);
this.call.sendMessageWithContext(context, serialized);
} catch (e) {
this.call.cancelWithStatus(Status.INTERNAL, `Request message serialization failure: ${e.message}`);
return;
}
this.call.sendMessageWithContext(context, serialized);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessage(message: any) {
Expand All @@ -370,15 +371,16 @@ class BaseInterceptingCall implements InterceptingCallInterface {
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
interceptingListener?.onReceiveMessage?.(deserialized);
} catch (e) {
readError = {
code: Status.INTERNAL,
details: `Response message parsing error: ${e.message}`,
metadata: new Metadata(),
};
this.call.cancelWithStatus(readError.code, readError.details);
return;
}
interceptingListener?.onReceiveMessage?.(deserialized);
},
onReceiveStatus: (status) => {
if (readError) {
Expand Down
12 changes: 8 additions & 4 deletions packages/grpc-js/src/deadline-filter.ts
Expand Up @@ -56,10 +56,14 @@ export class DeadlineFilter extends BaseFilter implements Filter {
}
const now: number = new Date().getTime();
let timeout = this.deadline - now;
if (timeout < 0) {
timeout = 0;
}
if (this.deadline !== Infinity) {
if (timeout <= 0) {
process.nextTick(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
'Deadline exceeded'
);
});
} else if (this.deadline !== Infinity) {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED,
Expand Down
6 changes: 2 additions & 4 deletions packages/grpc-js/src/load-balancer-round-robin.ts
Expand Up @@ -128,14 +128,12 @@ export class RoundRobinLoadBalancer implements LoadBalancer {
this.subchannelStateCounts[previousState] -= 1;
this.subchannelStateCounts[newState] += 1;
this.calculateAndUpdateState();

if (newState === ConnectivityState.TRANSIENT_FAILURE) {
this.channelControlHelper.requestReresolution();
}

if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
) {
this.channelControlHelper.requestReresolution();
subchannel.startConnecting();
}
};
Expand Down

0 comments on commit b570200

Please sign in to comment.