Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unary request and client stream promise support #981

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/grpc-native-core/index.d.ts
Expand Up @@ -1089,8 +1089,8 @@ declare module "grpc" {
argument: RequestType | null,
metadata: Metadata | null,
options: CallOptions | null,
callback: requestCallback<ResponseType>,
): ClientUnaryCall;
callback?: requestCallback<ResponseType>,
): ClientUnaryCall & PromiseLike<ResponseType>;

/**
* Make a client stream request to the given method, using the given serialize
Expand All @@ -1109,8 +1109,8 @@ declare module "grpc" {
deserialize: deserialize<ResponseType>,
metadata: Metadata | null,
options: CallOptions | null,
callback: requestCallback<ResponseType>,
): ClientWritableStream<RequestType>;
callback?: requestCallback<ResponseType>,
): ClientWritableStream<RequestType> & PromiseLike<ResponseType>;

/**
* Make a server stream request to the given method, with the given serialize
Expand Down
128 changes: 92 additions & 36 deletions packages/grpc-native-core/src/client.js
Expand Up @@ -470,15 +470,22 @@ Client.prototype.makeUnaryRequest = function(path, serialize, deserialize,
metadata = new Metadata();
options = {};
}
if (!callback && !options) {
if (metadata instanceof Metadata) {
options = {};
} else {
options = metadata;
metadata = new Metadata();
}
}
if (!metadata) {
metadata = new Metadata();
}
if (!options) {
options = {};
}
if (!((metadata instanceof Metadata) &&
(options instanceof Object) &&
(typeof callback === 'function'))) {
(options instanceof Object))) {
throw new Error('Argument mismatch in makeUnaryRequest');
}

Expand Down Expand Up @@ -517,26 +524,47 @@ Client.prototype.makeUnaryRequest = function(path, serialize, deserialize,
callOptions.interceptor_providers
);

var intercepting_call = client_interceptors.getInterceptingCall(
methodDefinition,
callOptions,
interceptors,
callProperties.channel,
callProperties.callback
);

var emitter = callProperties.call;
emitter.call = intercepting_call;

var last_listener = client_interceptors.getLastListener(
methodDefinition,
emitter,
callProperties.callback
);
var promise = new Promise((resolve, reject) => {
var promise_callback = function (err, data) {
if (typeof callProperties.callback === 'function') {
callProperties.callback.apply(this, [err, data]);
}

if (err) {
return reject(err);
}
resolve(data);
};

intercepting_call.start(callProperties.metadata, last_listener);
intercepting_call.sendMessage(callProperties.argument);
intercepting_call.halfClose();
var intercepting_call = client_interceptors.getInterceptingCall(
methodDefinition,
callOptions,
interceptors,
callProperties.channel,
promise_callback
);

emitter.call = intercepting_call;

var last_listener = client_interceptors.getLastListener(
methodDefinition,
emitter,
promise_callback
);

intercepting_call.start(callProperties.metadata, last_listener);
intercepting_call.sendMessage(callProperties.argument);
intercepting_call.halfClose();
});

if (typeof callProperties.callback === 'function') {
// avoid UnhandledPromiseRejectionWarning
promise.catch(() => {});
}

emitter.then = promise.then.bind(promise);

return emitter;
};
Expand Down Expand Up @@ -573,15 +601,22 @@ Client.prototype.makeClientStreamRequest = function(path, serialize,
metadata = new Metadata();
options = {};
}
if (!callback && !options) {
if (metadata instanceof Metadata) {
options = {};
} else {
options = metadata;
metadata = new Metadata();
}
}
if (!metadata) {
metadata = new Metadata();
}
if (!options) {
options = {};
}
if (!((metadata instanceof Metadata) &&
(options instanceof Object) &&
(typeof callback === 'function'))) {
(options instanceof Object))) {
throw new Error('Argument mismatch in makeClientStreamRequest');
}

Expand Down Expand Up @@ -619,24 +654,45 @@ Client.prototype.makeClientStreamRequest = function(path, serialize,
callOptions.interceptor_providers
);

var intercepting_call = client_interceptors.getInterceptingCall(
methodDefinition,
callOptions,
interceptors,
callProperties.channel,
callProperties.callback
);

var emitter = callProperties.call;
emitter.call = intercepting_call;

var last_listener = client_interceptors.getLastListener(
methodDefinition,
emitter,
callProperties.callback
);
var promise = new Promise((resolve, reject) => {
var promise_callback = function (err, data) {
if (typeof callProperties.callback === 'function') {
callProperties.callback.apply(this, [err, data]);
}

if (err) {
return reject(err);
}
resolve(data);
};

intercepting_call.start(callProperties.metadata, last_listener);
var intercepting_call = client_interceptors.getInterceptingCall(
methodDefinition,
callOptions,
interceptors,
callProperties.channel,
promise_callback
);

emitter.call = intercepting_call;

var last_listener = client_interceptors.getLastListener(
methodDefinition,
emitter,
promise_callback
);

intercepting_call.start(callProperties.metadata, last_listener);
});

if (typeof callProperties.callback === 'function') {
// avoid UnhandledPromiseRejectionWarning
promise.catch(() => {});
}

emitter.then = promise.then.bind(promise);

return emitter;
};
Expand Down
25 changes: 25 additions & 0 deletions packages/grpc-native-core/test/client_interceptors_test.js
Expand Up @@ -193,6 +193,21 @@ describe('Client interceptors', function() {
echoBidiStream: []
}));
});
it('with unary call and promise', function(done) {
var expected_value = 'foo';
var message = {value: expected_value};
client.echo(message)
.then(function(response) {
assert.strictEqual(response.value, expected_value);
done();
});
assert(_.isEqual(grpc_client.getClientInterceptors(client), {
echo: [],
echoClientStream: [],
echoServerStream: [],
echoBidiStream: []
}));
});
});

describe('execute downstream interceptors when a new call is made outbound',
Expand Down Expand Up @@ -288,6 +303,16 @@ describe('Client interceptors', function() {
stream.write(message);
stream.end();
});
it('with client streaming call with promise', function(done) {
registry = new CallRegistry(done, expected_calls, false);
var message = { value: 'foo' };
var stream = client.echoClientStream(options);
stream.write(message);
stream.end().then(function (res) {
assert.strictEqual(res.value, message.value);
registry.addCall('response');
});
});
it('with server streaming call', function(done) {
registry = new CallRegistry(done, expected_calls, true);
var message = {};
Expand Down