Skip to content

Commit

Permalink
feat: COUNT Queries (#1774)
Browse files Browse the repository at this point in the history
Co-authored-by: Denver Coneybeare <dconeybe@google.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 3, 2022
1 parent 5ba8df0 commit bcaecb4
Show file tree
Hide file tree
Showing 8 changed files with 775 additions and 15 deletions.
325 changes: 323 additions & 2 deletions dev/src/reference.ts
Expand Up @@ -15,7 +15,7 @@
*/

import * as firestore from '@google-cloud/firestore';
import {Duplex, Transform} from 'stream';
import {Duplex, Readable, Transform} from 'stream';
import * as deepEqual from 'fast-deep-equal';

import * as protos from '../protos/firestore_v1_proto_api';
Expand Down Expand Up @@ -55,7 +55,6 @@ import {
} from './validate';
import {DocumentWatch, QueryWatch} from './watch';
import {validateDocumentData, WriteBatch, WriteResult} from './write-batch';

import api = protos.google.firestore.v1;

/**
Expand Down Expand Up @@ -1599,6 +1598,27 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
return new Query(this._firestore, options);
}

/**
* Returns a query that counts the documents in the result set of this
* query.
*
* The returned query, when executed, counts the documents in the result set
* of this query without actually downloading the documents.
*
* Using the returned query to count the documents is efficient because only
* the final count, not the documents' data, is downloaded. The returned
* query can even count the documents if the result set would be
* prohibitively large to download entirely (e.g. thousands of documents).
*
* @return a query that counts the documents in the result set of this
* query. The count can be retrieved from `snapshot.data().count`, where
* `snapshot` is the `AggregateQuerySnapshot` resulting from running the
* returned query.
*/
count(): AggregateQuery<{count: firestore.AggregateField<number>}> {
return new AggregateQuery(this, {count: {}});
}

/**
* Returns true if this `Query` is equal to the provided value.
*
Expand Down Expand Up @@ -2832,6 +2852,307 @@ export class CollectionReference<T = firestore.DocumentData>
}
}

/**
* A query that calculates aggregations over an underlying query.
*/
export class AggregateQuery<T extends firestore.AggregateSpec>
implements firestore.AggregateQuery<T>
{
/**
* @private
* @internal
*
* @param _query The query whose aggregations will be calculated by this
* object.
* @param _aggregates The aggregations that will be performed by this query.
*/
constructor(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly _query: Query<any>,
private readonly _aggregates: T
) {}

/** The query whose aggregations will be calculated by this object. */
get query(): firestore.Query<unknown> {
return this._query;
}

/**
* Executes this query.
*
* @return A promise that will be resolved with the results of the query.
*/
get(): Promise<AggregateQuerySnapshot<T>> {
return this._get();
}

/**
* Internal get() method that accepts an optional transaction id.
*
* @private
* @internal
* @param {bytes=} transactionId A transaction ID.
*/
_get(transactionId?: Uint8Array): Promise<AggregateQuerySnapshot<T>> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

return new Promise((resolve, reject) => {
const stream = this._stream(transactionId);
stream.on('error', err => {
reject(wrapError(err, stack));
});
stream.once('data', result => {
stream.destroy();
resolve(result);
});
stream.on('end', () => {
reject('No AggregateQuery results');
});
});
}

/**
* Internal streaming method that accepts an optional transaction ID.
*
* @private
* @internal
* @param transactionId A transaction ID.
* @returns A stream of document results.
*/
_stream(transactionId?: Uint8Array): Readable {
const tag = requestTag();
const firestore = this._query.firestore;

const stream: Transform = new Transform({
objectMode: true,
transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => {
if (proto.result) {
const readTime = Timestamp.fromProto(proto.readTime!);
const data = this.decodeResult(proto.result);
callback(
undefined,
new AggregateQuerySnapshot<T>(this, readTime, data)
);
} else {
callback(Error('RunAggregationQueryResponse is missing result'));
}
},
});

firestore
.initializeIfNeeded(tag)
.then(async () => {
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = this.toProto(transactionId);

let streamActive: Deferred<boolean>;
do {
streamActive = new Deferred<boolean>();
const backendStream = await firestore.requestStream(
'runAggregationQuery',
/* bidirectional= */ false,
request,
tag
);
stream.on('close', () => {
backendStream.resume();
backendStream.end();
});
backendStream.on('error', err => {
backendStream.unpipe(stream);
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (
!transactionId &&
!isPermanentRpcError(err, 'runAggregationQuery')
) {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with retryable stream error:',
err
);
streamActive.resolve(/* active= */ true);
} else {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with stream error:',
err
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
});
backendStream.resume();
backendStream.pipe(stream);
} while (await streamActive.promise);
})
.catch(e => stream.destroy(e));

return stream;
}

/**
* Internal method to decode values within result.
* @private
*/
private decodeResult(
proto: api.IAggregationResult
): firestore.AggregateSpecData<T> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const data: any = {};
const fields = proto.aggregateFields;
if (fields) {
const serializer = this._query.firestore._serializer!;
for (const prop of Object.keys(fields)) {
if (this._aggregates[prop] === undefined) {
throw new Error(
`Unexpected alias [${prop}] in result aggregate result`
);
}
data[prop] = serializer.decodeValue(fields[prop]);
}
}
return data;
}

/**
* Internal method for serializing a query to its RunAggregationQuery proto
* representation with an optional transaction id.
*
* @private
* @internal
* @returns Serialized JSON for the query.
*/
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
const queryProto = this._query.toProto();
//TODO(tomandersen) inspect _query to build request - this is just hard
// coded count right now.
const runQueryRequest: api.IRunAggregationQueryRequest = {
parent: queryProto.parent,
structuredAggregationQuery: {
structuredQuery: queryProto.structuredQuery,
aggregations: [
{
alias: 'count',
count: {},
},
],
},
};

if (transactionId instanceof Uint8Array) {
runQueryRequest.transaction = transactionId;
}

return runQueryRequest;
}

/**
* Compares this object with the given object for equality.
*
* This object is considered "equal" to the other object if and only if
* `other` performs the same aggregations as this `AggregateQuery` and
* the underlying Query of `other` compares equal to that of this object
* using `Query.isEqual()`.
*
* @param other The object to compare to this object for equality.
* @return `true` if this object is "equal" to the given object, as
* defined above, or `false` otherwise.
*/
isEqual(other: firestore.AggregateQuery<T>): boolean {
if (this === other) {
return true;
}
if (!(other instanceof AggregateQuery)) {
return false;
}
if (!this.query.isEqual(other.query)) {
return false;
}
return deepEqual(this._aggregates, other._aggregates);
}
}

/**
* The results of executing an aggregation query.
*/
export class AggregateQuerySnapshot<T extends firestore.AggregateSpec>
implements firestore.AggregateQuerySnapshot<T>
{
/**
* @private
* @internal
*
* @param _query The query that was executed to produce this result.
* @param _readTime The time this snapshot was read.
* @param _data The results of the aggregations performed over the underlying
* query.
*/
constructor(
private readonly _query: AggregateQuery<T>,
private readonly _readTime: Timestamp,
private readonly _data: firestore.AggregateSpecData<T>
) {}

/** The query that was executed to produce this result. */
get query(): firestore.AggregateQuery<T> {
return this._query;
}

/** The time this snapshot was read. */
get readTime(): firestore.Timestamp {
return this._readTime;
}

/**
* Returns the results of the aggregations performed over the underlying
* query.
*
* The keys of the returned object will be the same as those of the
* `AggregateSpec` object specified to the aggregation method, and the
* values will be the corresponding aggregation result.
*
* @returns The results of the aggregations performed over the underlying
* query.
*/
data(): firestore.AggregateSpecData<T> {
return this._data;
}

/**
* Compares this object with the given object for equality.
*
* Two `AggregateQuerySnapshot` instances are considered "equal" if they
* have the same data and their underlying queries compare "equal" using
* `AggregateQuery.isEqual()`.
*
* @param other The object to compare to this object for equality.
* @return `true` if this object is "equal" to the given object, as
* defined above, or `false` otherwise.
*/
isEqual(other: firestore.AggregateQuerySnapshot<T>): boolean {
if (this === other) {
return true;
}
if (!(other instanceof AggregateQuerySnapshot)) {
return false;
}
// Since the read time is different on every read, we explicitly ignore all
// document metadata in this comparison, just like
// `DocumentSnapshot.isEqual()` does.
if (!this.query.isEqual(other.query)) {
return false;
}

return deepEqual(this._data, other._data);
}
}

/**
* Validates the input string as a field order direction.
*
Expand Down

0 comments on commit bcaecb4

Please sign in to comment.