Skip to content

Commit

Permalink
Merge pull request #1616 from murgatroid99/grpc-js_deadline_cancellat…
Browse files Browse the repository at this point in the history
…ion_propagation

grpc-js: Implement deadline and cancellation propagation
  • Loading branch information
murgatroid99 committed Nov 11, 2020
2 parents 03aaeb7 + ae2b64b commit ca6cf0f
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 30 deletions.
16 changes: 13 additions & 3 deletions packages/grpc-js/src/call-stream.ts
Expand Up @@ -18,7 +18,7 @@
import * as http2 from 'http2';

import { CallCredentials } from './call-credentials';
import { Status } from './constants';
import { Propagate, Status } from './constants';
import { Filter, FilterFactory } from './filter';
import { FilterStackFactory, FilterStack } from './filter-stack';
import { Metadata } from './metadata';
Expand All @@ -27,6 +27,7 @@ import { ChannelImplementation } from './channel';
import { Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';

const TRACER_NAME = 'call_stream';

Expand All @@ -42,7 +43,7 @@ export interface CallStreamOptions {
deadline: Deadline;
flags: number;
host: string;
parentCall: Call | null;
parentCall: ServerSurfaceCall | null;
}

export type PartialCallStreamOptions = Partial<CallStreamOptions>;
Expand Down Expand Up @@ -218,6 +219,11 @@ export class Http2CallStream implements Call {
metadata: new Metadata(),
});
};
if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) {
this.options.parentCall.on('cancelled', () => {
this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
});
}
}

private outputStatus() {
Expand Down Expand Up @@ -623,7 +629,11 @@ export class Http2CallStream implements Call {
}

getDeadline(): Deadline {
return this.options.deadline;
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) {
return this.options.parentCall.getDeadline();
} else {
return this.options.deadline;
}
}

getCredentials(): CallCredentials {
Expand Down
14 changes: 8 additions & 6 deletions packages/grpc-js/src/channel.ts
Expand Up @@ -28,7 +28,7 @@ import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity } from './constants';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
Expand All @@ -39,6 +39,8 @@ import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';

export enum ConnectivityState {
IDLE,
Expand Down Expand Up @@ -118,7 +120,7 @@ export interface Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call;
}
Expand Down Expand Up @@ -509,7 +511,7 @@ export class ChannelImplementation implements Channel {
method: string,
deadline: Deadline,
host: string | null | undefined,
parentCall: any, // eslint-disable-line @typescript-eslint/no-explicit-any
parentCall: ServerSurfaceCall | null,
propagateFlags: number | null | undefined
): Call {
if (typeof method !== 'string') {
Expand Down Expand Up @@ -537,9 +539,9 @@ export class ChannelImplementation implements Channel {
);
const finalOptions: CallStreamOptions = {
deadline: deadline,
flags: propagateFlags || 0,
host: host || this.defaultAuthority,
parentCall: parentCall || null,
flags: propagateFlags ?? Propagate.DEFAULTS,
host: host ?? this.defaultAuthority,
parentCall: parentCall,
};
const stream: Http2CallStream = new Http2CallStream(
method,
Expand Down
20 changes: 5 additions & 15 deletions packages/grpc-js/src/client-interceptors.ts
Expand Up @@ -311,21 +311,11 @@ export class InterceptingCall implements InterceptingCallInterface {
}

function getCall(channel: Channel, path: string, options: CallOptions): Call {
let deadline;
let host;
const parent = null;
let propagateFlags;
let credentials;
if (options) {
deadline = options.deadline;
host = options.host;

propagateFlags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
const deadline = options.deadline ?? Infinity;
const host = options.host;
const parent = options.parent ?? null;
const propagateFlags = options.propagate_flags;
const credentials = options.credentials;
const call = channel.createCall(path, deadline, host, parent, propagateFlags);
if (credentials) {
call.setCredentials(credentials);
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/constants.ts
Expand Up @@ -50,7 +50,8 @@ export enum Propagate {
CENSUS_STATS_CONTEXT = 2,
CENSUS_TRACING_CONTEXT = 4,
CANCELLATION = 8,
DEFAULTS = 65536,
// https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43
DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION,
}

// -1 means unlimited
Expand Down
34 changes: 29 additions & 5 deletions packages/grpc-js/src/server-call.ts
Expand Up @@ -19,7 +19,7 @@ import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';

import { StatusObject } from './call-stream';
import { Deadline, StatusObject } from './call-stream';
import {
Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
Expand Down Expand Up @@ -78,6 +78,7 @@ export type ServerSurfaceCall = {
readonly metadata: Metadata;
getPeer(): string;
sendMetadata(responseMetadata: Metadata): void;
getDeadline(): Deadline;
} & EventEmitter;

export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
Expand Down Expand Up @@ -120,6 +121,10 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}
}

export class ServerReadableStreamImpl<RequestType, ResponseType>
Expand Down Expand Up @@ -153,6 +158,10 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}
}

export class ServerWritableStreamImpl<RequestType, ResponseType>
Expand Down Expand Up @@ -186,6 +195,10 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}

_write(
chunk: ResponseType,
encoding: string,
Expand Down Expand Up @@ -257,6 +270,10 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
this.call.sendMetadata(responseMetadata);
}

getDeadline(): Deadline {
return this.call.getDeadline();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
Expand Down Expand Up @@ -357,7 +374,8 @@ export class Http2ServerCallStream<
ResponseType
> extends EventEmitter {
cancelled = false;
deadline: NodeJS.Timer = setTimeout(() => {}, 0);
deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
private deadline: Deadline = Infinity;
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
Expand Down Expand Up @@ -405,7 +423,7 @@ export class Http2ServerCallStream<
}

// Clear noop timer
clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);
}

private checkCancelled(): boolean {
Expand Down Expand Up @@ -452,7 +470,9 @@ export class Http2ServerCallStream<

const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;

this.deadline = setTimeout(handleExpiredDeadline, timeout, this);
const now = new Date();
this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}

Expand Down Expand Up @@ -566,7 +586,7 @@ export class Http2ServerCallStream<
statusObj.details
);

clearTimeout(this.deadline);
clearTimeout(this.deadlineTimer);

if (!this.wantTrailers) {
this.wantTrailers = true;
Expand Down Expand Up @@ -779,6 +799,10 @@ export class Http2ServerCallStream<
return 'unknown';
}
}

getDeadline(): Deadline {
return this.deadline;
}
}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down

0 comments on commit ca6cf0f

Please sign in to comment.