Skip to content

Commit

Permalink
Merge pull request #275 from kjin/grpc-js-connect
Browse files Browse the repository at this point in the history
grpc-js-core: only listen for channel connect event once
  • Loading branch information
murgatroid99 committed Apr 17, 2018
2 parents ee8ec2e + c0f7afe commit 8176c70
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js-core/src/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ function setUpReadableStream<ResponseType>(
stream.push(null);
});
call.on('status', (status: StatusObject) => {
stream.emit('status', status);
if (status.code !== Status.OK) {
const statusName = _.invert(Status)[status.code];
const message: string = `${status.code} ${statusName}: ${status.details}`;
const error: ServiceError = Object.assign(new Error(status.details), status);
stream.emit('error', error);
}
stream.emit('status', status);
});
call.pause();
}
Expand Down
40 changes: 33 additions & 7 deletions packages/grpc-js-core/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export class Http2Channel extends EventEmitter implements Channel {
private readonly target: url.URL;
private readonly defaultAuthority: string;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
// Helper Promise object only used in the implementation of connect().
private connecting: Promise<void>|null = null;
/* For now, we have up to one subchannel, which will exist as long as we are
* connecting or trying to connect */
private subChannel: http2.ClientHttp2Session|null = null;
Expand Down Expand Up @@ -127,6 +129,7 @@ export class Http2Channel extends EventEmitter implements Channel {
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
this.subChannel = null;
this.emit('shutdown');
clearTimeout(this.backoffTimerId);
}
break;
Expand Down Expand Up @@ -279,15 +282,38 @@ export class Http2Channel extends EventEmitter implements Channel {
return stream;
}

/**
* Attempts to connect, returning a Promise that resolves when the connection
* is successful, or rejects if the channel is shut down.
*/
connect(): Promise<void> {
return new Promise((resolve) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
if (this.connectivityState === ConnectivityState.READY) {
setImmediate(resolve);
} else {
this.once('connect', resolve);
if (this.connectivityState === ConnectivityState.READY) {
return Promise.resolve();
} else if (this.connectivityState === ConnectivityState.SHUTDOWN) {
return Promise.reject(new Error('Channel has been shut down'));
} else {
// In effect, this.connecting is only assigned upon the first attempt to
// transition from IDLE to CONNECTING, so this condition could have also
// been (connectivityState === IDLE).
if (!this.connecting) {
this.connecting = new Promise((resolve, reject) => {
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING);
const onConnect = () => {
this.connecting = null;
this.removeListener('shutdown', onShutdown);
resolve();
};
const onShutdown = () => {
this.connecting = null;
this.removeListener('connect', onConnect);
reject(new Error('Channel has been shut down'));
};
this.once('connect', onConnect);
this.once('shutdown', onShutdown);
});
}
});
return this.connecting;
}
}

getConnectivityState(): ConnectivityState {
Expand Down
6 changes: 6 additions & 0 deletions packages/grpc-js-core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ export class Client {
clearTimeout(timer);
}
cb(null);
}, (err: Error) => {
// Rejection occurs if channel is shut down first.
if (timer) {
clearTimeout(timer);
}
cb(err);
});
if (deadline !== Infinity) {
let timeout: number;
Expand Down

0 comments on commit 8176c70

Please sign in to comment.