Skip to content

Commit

Permalink
Support logical termination for grpc connection (#6324)
Browse files Browse the repository at this point in the history
* Support logical termination for grpc connection

* Address Feedback
  • Loading branch information
cherylEnkidu committed Jun 16, 2022
1 parent 578dc58 commit 421fc3b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
17 changes: 14 additions & 3 deletions packages/firestore/src/platform/node/grpc_connection.ts
Expand Up @@ -153,11 +153,11 @@ export class GrpcConnection implements Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]> {
const results: Resp[] = [];
const responseDeferred = new Deferred<Resp[]>();

logDebug(
LOG_TAG,
`RPC '${rpcName}' invoked (streaming) with request:`,
Expand All @@ -172,13 +172,24 @@ export class GrpcConnection implements Connection {
);
const jsonRequest = { ...request, database: this.databasePath };
const stream = stub[rpcName](jsonRequest, metadata);
let callbackFired = false;
stream.on('data', (response: Resp) => {
logDebug(LOG_TAG, `RPC ${rpcName} received result:`, response);
results.push(response);
if (
expectedResponseCount !== undefined &&
results.length === expectedResponseCount
) {
callbackFired = true;
responseDeferred.resolve(results);
}
});
stream.on('end', () => {
logDebug(LOG_TAG, `RPC '${rpcName}' completed.`);
responseDeferred.resolve(results);
if (!callbackFired) {
callbackFired = true;
responseDeferred.resolve(results);
}
});
stream.on('error', (grpcError: grpc.ServiceError) => {
logDebug(LOG_TAG, `RPC '${rpcName}' failed with error:`, grpcError);
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/connection.ts
Expand Up @@ -68,7 +68,8 @@ export interface Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]>;

/**
Expand Down
8 changes: 5 additions & 3 deletions packages/firestore/src/remote/datastore.ts
Expand Up @@ -120,7 +120,8 @@ class DatastoreImpl extends Datastore {
invokeStreamingRPC<Req, Resp>(
rpcName: string,
path: string,
request: Req
request: Req,
expectedResponseCount?: number
): Promise<Resp[]> {
this.verifyInitialized();
return Promise.all([
Expand All @@ -133,7 +134,8 @@ class DatastoreImpl extends Datastore {
path,
request,
authToken,
appCheckToken
appCheckToken,
expectedResponseCount
);
})
.catch((error: FirestoreError) => {
Expand Down Expand Up @@ -194,7 +196,7 @@ export async function invokeBatchGetDocumentsRpc(
const response = await datastoreImpl.invokeStreamingRPC<
ProtoBatchGetDocumentsRequest,
ProtoBatchGetDocumentsResponse
>('BatchGetDocuments', path, request);
>('BatchGetDocuments', path, request, keys.length);

const docs = new Map<string, Document>();
response.forEach(proto => {
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/src/remote/rest_connection.ts
Expand Up @@ -104,7 +104,8 @@ export abstract class RestConnection implements Connection {
path: string,
request: Req,
authToken: Token | null,
appCheckToken: Token | null
appCheckToken: Token | null,
expectedResponseCount?: number
): Promise<Resp[]> {
// The REST API automatically aggregates all of the streamed results, so we
// can just use the normal invoke() method.
Expand Down

0 comments on commit 421fc3b

Please sign in to comment.