From ae2b64bd659a74adcca0c48d1cefc6aa252795cf Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 30 Oct 2020 11:38:30 -0700 Subject: [PATCH] grpc-js: Implement deadline and cancellation propagation --- packages/grpc-js/src/call-stream.ts | 16 +- packages/grpc-js/src/channel.ts | 14 +- packages/grpc-js/src/client-interceptors.ts | 20 +- packages/grpc-js/src/constants.ts | 3 +- packages/grpc-js/src/server-call.ts | 34 ++- .../grpc-js/test/test-call-propagation.ts | 253 ++++++++++++++++++ 6 files changed, 310 insertions(+), 30 deletions(-) create mode 100644 packages/grpc-js/test/test-call-propagation.ts diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index fd50a8076..32b851654 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -18,7 +18,7 @@ import * as http2 from 'http2'; import { CallCredentials } from './call-credentials'; -import { Status } from './constants'; +import { Propagate, Status } from './constants'; import { Filter, FilterFactory } from './filter'; import { FilterStackFactory, FilterStack } from './filter-stack'; import { Metadata } from './metadata'; @@ -27,6 +27,7 @@ import { ChannelImplementation } from './channel'; import { Subchannel } from './subchannel'; import * as logging from './logging'; import { LogVerbosity } from './constants'; +import { ServerSurfaceCall } from './server-call'; const TRACER_NAME = 'call_stream'; @@ -42,7 +43,7 @@ export interface CallStreamOptions { deadline: Deadline; flags: number; host: string; - parentCall: Call | null; + parentCall: ServerSurfaceCall | null; } export type PartialCallStreamOptions = Partial; @@ -218,6 +219,11 @@ export class Http2CallStream implements Call { metadata: new Metadata(), }); }; + if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) { + this.options.parentCall.on('cancelled', () => { + this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call'); + }); + } } private outputStatus() { @@ -623,7 +629,11 @@ export class Http2CallStream implements Call { } getDeadline(): Deadline { - return this.options.deadline; + if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) { + return this.options.parentCall.getDeadline(); + } else { + return this.options.deadline; + } } getCredentials(): CallCredentials { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index dcfae483d..05588c497 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -28,7 +28,7 @@ import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; import { UnavailablePicker, Picker, PickResultType } from './picker'; import { Metadata } from './metadata'; -import { Status, LogVerbosity } from './constants'; +import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; import { CallCredentialsFilterFactory } from './call-credentials-filter'; import { DeadlineFilterFactory } from './deadline-filter'; @@ -39,6 +39,8 @@ import { SubchannelAddress } from './subchannel'; import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; +import { ServerSurfaceCall } from './server-call'; +import { SurfaceCall } from './call'; export enum ConnectivityState { CONNECTING, @@ -118,7 +120,7 @@ export interface Channel { method: string, deadline: Deadline, host: string | null | undefined, - parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any + parentCall: ServerSurfaceCall | null, propagateFlags: number | null | undefined ): Call; } @@ -509,7 +511,7 @@ export class ChannelImplementation implements Channel { method: string, deadline: Deadline, host: string | null | undefined, - parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any + parentCall: ServerSurfaceCall | null, propagateFlags: number | null | undefined ): Call { if (typeof method !== 'string') { @@ -537,9 +539,9 @@ export class ChannelImplementation implements Channel { ); const finalOptions: CallStreamOptions = { deadline: deadline, - flags: propagateFlags || 0, - host: host || this.defaultAuthority, - parentCall: parentCall || null, + flags: propagateFlags ?? Propagate.DEFAULTS, + host: host ?? this.defaultAuthority, + parentCall: parentCall, }; const stream: Http2CallStream = new Http2CallStream( method, diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index fe36ea36d..09e7f5aa5 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -311,21 +311,11 @@ export class InterceptingCall implements InterceptingCallInterface { } function getCall(channel: Channel, path: string, options: CallOptions): Call { - let deadline; - let host; - const parent = null; - let propagateFlags; - let credentials; - if (options) { - deadline = options.deadline; - host = options.host; - - propagateFlags = options.propagate_flags; - credentials = options.credentials; - } - if (deadline === undefined) { - deadline = Infinity; - } + const deadline = options.deadline ?? Infinity; + const host = options.host; + const parent = options.parent ?? null; + const propagateFlags = options.propagate_flags; + const credentials = options.credentials; const call = channel.createCall(path, deadline, host, parent, propagateFlags); if (credentials) { call.setCredentials(credentials); diff --git a/packages/grpc-js/src/constants.ts b/packages/grpc-js/src/constants.ts index e760658d0..d30b78f08 100644 --- a/packages/grpc-js/src/constants.ts +++ b/packages/grpc-js/src/constants.ts @@ -50,7 +50,8 @@ export enum Propagate { CENSUS_STATS_CONTEXT = 2, CENSUS_TRACING_CONTEXT = 4, CANCELLATION = 8, - DEFAULTS = 65536, + // https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43 + DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION, } // -1 means unlimited diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 2c62b206d..aa8bd647e 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,7 +19,7 @@ import { EventEmitter } from 'events'; import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; -import { StatusObject } from './call-stream'; +import { Deadline, StatusObject } from './call-stream'; import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, @@ -78,6 +78,7 @@ export type ServerSurfaceCall = { readonly metadata: Metadata; getPeer(): string; sendMetadata(responseMetadata: Metadata): void; + getDeadline(): Deadline; } & EventEmitter; export type ServerUnaryCall = ServerSurfaceCall & { @@ -120,6 +121,10 @@ export class ServerUnaryCallImpl extends EventEmitter sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } + + getDeadline(): Deadline { + return this.call.getDeadline(); + } } export class ServerReadableStreamImpl @@ -153,6 +158,10 @@ export class ServerReadableStreamImpl sendMetadata(responseMetadata: Metadata): void { this.call.sendMetadata(responseMetadata); } + + getDeadline(): Deadline { + return this.call.getDeadline(); + } } export class ServerWritableStreamImpl @@ -186,6 +195,10 @@ export class ServerWritableStreamImpl this.call.sendMetadata(responseMetadata); } + getDeadline(): Deadline { + return this.call.getDeadline(); + } + _write( chunk: ResponseType, encoding: string, @@ -257,6 +270,10 @@ export class ServerDuplexStreamImpl extends Duplex this.call.sendMetadata(responseMetadata); } + getDeadline(): Deadline { + return this.call.getDeadline(); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any end(metadata?: any) { if (metadata) { @@ -357,7 +374,8 @@ export class Http2ServerCallStream< ResponseType > extends EventEmitter { cancelled = false; - deadline: NodeJS.Timer = setTimeout(() => {}, 0); + deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + private deadline: Deadline = Infinity; private wantTrailers = false; private metadataSent = false; private canPush = false; @@ -405,7 +423,7 @@ export class Http2ServerCallStream< } // Clear noop timer - clearTimeout(this.deadline); + clearTimeout(this.deadlineTimer); } private checkCancelled(): boolean { @@ -452,7 +470,9 @@ export class Http2ServerCallStream< const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0; - this.deadline = setTimeout(handleExpiredDeadline, timeout, this); + const now = new Date(); + this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout); + this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this); metadata.remove(GRPC_TIMEOUT_HEADER); } @@ -566,7 +586,7 @@ export class Http2ServerCallStream< statusObj.details ); - clearTimeout(this.deadline); + clearTimeout(this.deadlineTimer); if (!this.wantTrailers) { this.wantTrailers = true; @@ -779,6 +799,10 @@ export class Http2ServerCallStream< return 'unknown'; } } + + getDeadline(): Deadline { + return this.deadline; + } } /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/grpc-js/test/test-call-propagation.ts b/packages/grpc-js/test/test-call-propagation.ts new file mode 100644 index 000000000..3ce57be17 --- /dev/null +++ b/packages/grpc-js/test/test-call-propagation.ts @@ -0,0 +1,253 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; + +import * as grpc from '../src'; +import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; + +import { loadProtoFile } from './common'; + +function multiDone(done: () => void, target: number) { + let count = 0; + return () => { + count++; + if (count >= target) { + done(); + } + } +} + +describe('Call propagation', () => { + let server: grpc.Server; + let Client: ServiceClientConstructor; + let client: ServiceClient; + let proxyServer: grpc.Server; + let proxyClient: ServiceClient; + + before((done) => { + Client = loadProtoFile(__dirname + '/fixtures/test_service.proto').TestService as ServiceClientConstructor; + server = new grpc.Server(); + server.addService(Client.service, { + unary: () => {}, + clientStream: () => {}, + serverStream: () => {}, + bidiStream: () => {} + }); + proxyServer = new grpc.Server(); + server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + done(error); + return; + } + server.start(); + client = new Client(`localhost:${port}`, grpc.credentials.createInsecure()); + proxyServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, proxyPort) => { + if (error) { + done(error); + return; + } + proxyServer.start(); + proxyClient = new Client(`localhost:${proxyPort}`, grpc.credentials.createInsecure()); + done(); + }); + }); + }); + afterEach(() => { + proxyServer.removeService(Client.service); + }); + after(() => { + server.forceShutdown(); + proxyServer.forceShutdown(); + }); + describe('Cancellation', () => { + it('should work with unary requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientUnaryCall; + proxyServer.addService(Client.service, { + unary: (parent: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + client.unary(parent.request, {parent: parent}, (error: grpc.ServiceError, value: unknown) => { + callback(error, value); + assert(error); + assert.strictEqual(error.code, grpc.status.CANCELLED); + done(); + }); + /* Cancel the original call after the server starts processing it to + * ensure that it does reach the server. */ + call.cancel(); + } + }); + call = proxyClient.unary({}, (error: grpc.ServiceError, value: unknown) => { + assert(error); + assert.strictEqual(error.code, grpc.status.CANCELLED); + done(); + }); + }); + it('Should work with client streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientWritableStream; + proxyServer.addService(Client.service, { + clientStream: (parent: grpc.ServerReadableStream, callback: grpc.sendUnaryData) => { + client.clientStream({parent: parent}, (error: grpc.ServiceError, value: unknown) => { + callback(error, value); + assert(error); + assert.strictEqual(error.code, grpc.status.CANCELLED); + done(); + }); + /* Cancel the original call after the server starts processing it to + * ensure that it does reach the server. */ + call.cancel(); + } + }); + call = proxyClient.clientStream((error: grpc.ServiceError, value: unknown) => { + assert(error); + assert.strictEqual(error.code, grpc.status.CANCELLED); + done(); + }); + }); + it('Should work with server streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientReadableStream; + proxyServer.addService(Client.service, { + serverStream: (parent: grpc.ServerWritableStream) => { + const child = client.serverStream(parent.request, {parent: parent}); + child.on('error', () => {}); + child.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.CANCELLED); + done(); + }); + call.cancel(); + } + }); + call = proxyClient.serverStream({}); + call.on('error', () => {}); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.CANCELLED); + done(); + }); + }); + it('Should work with bidi streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientDuplexStream; + proxyServer.addService(Client.service, { + bidiStream: (parent: grpc.ServerDuplexStream) => { + const child = client.bidiStream({parent: parent}); + child.on('error', () => {}); + child.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.CANCELLED); + done(); + }); + call.cancel(); + } + }); + call = proxyClient.bidiStream(); + call.on('error', () => {}); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.CANCELLED); + done(); + }); + }); + }); + describe('Deadlines', () => { + it('should work with unary requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientUnaryCall; + proxyServer.addService(Client.service, { + unary: (parent: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + client.unary(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { + callback(error, value); + assert(error); + assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + } + }); + const deadline = new Date(); + deadline.setMilliseconds(deadline.getMilliseconds() + 100); + call = proxyClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => { + assert(error); + assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }); + it('Should work with client streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientWritableStream; + proxyServer.addService(Client.service, { + clientStream: (parent: grpc.ServerReadableStream, callback: grpc.sendUnaryData) => { + client.clientStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { + callback(error, value); + assert(error); + assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + } + }); + const deadline = new Date(); + deadline.setMilliseconds(deadline.getMilliseconds() + 100); + call = proxyClient.clientStream({deadline, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { + assert(error); + assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }); + it('Should work with server streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientReadableStream; + proxyServer.addService(Client.service, { + serverStream: (parent: grpc.ServerWritableStream) => { + const child = client.serverStream(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}); + child.on('error', () => {}); + child.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + } + }); + const deadline = new Date(); + deadline.setMilliseconds(deadline.getMilliseconds() + 100); + call = proxyClient.serverStream({}, {deadline}); + call.on('error', () => {}); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }); + it('Should work with bidi streaming requests', (done) => { + done = multiDone(done, 2); + let call: grpc.ClientDuplexStream; + proxyServer.addService(Client.service, { + bidiStream: (parent: grpc.ServerDuplexStream) => { + const child = client.bidiStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}); + child.on('error', () => {}); + child.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + } + }); + const deadline = new Date(); + deadline.setMilliseconds(deadline.getMilliseconds() + 100); + call = proxyClient.bidiStream({deadline}); + call.on('error', () => {}); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }); + }); +}); \ No newline at end of file