Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: COUNT Queries #1774

Merged
merged 33 commits into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f70e121
firestore.d.ts: COUNT API added
dconeybe Jun 20, 2022
76cd8e7
Work in progress
tom-andersen Sep 2, 2022
d9f5170
Implement count with tests.
tom-andersen Sep 12, 2022
6ddf4e9
Fix tests with better termination
tom-andersen Sep 14, 2022
eb422f9
Copy Denvers API changes
tom-andersen Sep 14, 2022
0d936e5
Implement COUNT API changes.
tom-andersen Sep 15, 2022
b1747cc
Add comments
tom-andersen Sep 15, 2022
70b5659
Fix linting errors
tom-andersen Sep 15, 2022
a06f110
Revert manual proto change
tom-andersen Sep 16, 2022
0ecf768
Add comments
tom-andersen Sep 16, 2022
f396bda
Fix types
tom-andersen Sep 16, 2022
349ca1e
Implement retry
tom-andersen Sep 16, 2022
f162f12
Pass transaction
tom-andersen Sep 16, 2022
63fa8d8
Comment out test
tom-andersen Sep 16, 2022
8871496
Fix test
tom-andersen Sep 16, 2022
12aaf16
Remove demo file
tom-andersen Sep 16, 2022
999b910
Run integration test
tom-andersen Sep 16, 2022
4d2ee83
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
401b843
Remove future aggregates
tom-andersen Sep 19, 2022
63ed8bb
Fix emulator
tom-andersen Sep 19, 2022
e21e3f9
Merge branch 'tomandersen/count' of https://github.com/googleapis/nod…
tom-andersen Sep 19, 2022
c58bad7
Enable test
tom-andersen Sep 19, 2022
dc8c3a5
Prettier
tom-andersen Sep 19, 2022
dbf9d1a
Cleanup
tom-andersen Sep 20, 2022
046399c
Add tests
tom-andersen Sep 20, 2022
c519991
Changes from PR review
tom-andersen Sep 20, 2022
d12c82a
Fix PR based on comments
tom-andersen Sep 22, 2022
80e5044
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 23, 2022
54b8bd7
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 29, 2022
daf127e
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 30, 2022
e3136b8
system-test/firestore.ts: remove unconditional call to setLogFunction…
dconeybe Sep 30, 2022
996d86e
docs: Write javadocs for COUNT API (#1783)
dconeybe Oct 3, 2022
38bc42b
Merge branch 'main' into tomandersen/count
tom-andersen Oct 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
325 changes: 323 additions & 2 deletions dev/src/reference.ts
Expand Up @@ -15,7 +15,7 @@
*/

import * as firestore from '@google-cloud/firestore';
import {Duplex, Transform} from 'stream';
import {Duplex, Readable, Transform} from 'stream';
import * as deepEqual from 'fast-deep-equal';

import * as protos from '../protos/firestore_v1_proto_api';
Expand Down Expand Up @@ -55,7 +55,6 @@ import {
} from './validate';
import {DocumentWatch, QueryWatch} from './watch';
import {validateDocumentData, WriteBatch, WriteResult} from './write-batch';

import api = protos.google.firestore.v1;

/**
Expand Down Expand Up @@ -1599,6 +1598,27 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
return new Query(this._firestore, options);
}

/**
* Returns a query that counts the documents in the result set of this
* query.
*
* The returned query, when executed, counts the documents in the result set
* of this query without actually downloading the documents.
*
* Using the returned query to count the documents is efficient because only
* the final count, not the documents' data, is downloaded. The returned
* query can even count the documents if the result set would be
* prohibitively large to download entirely (e.g. thousands of documents).
*
* @return a query that counts the documents in the result set of this
* query. The count can be retrieved from `snapshot.data().count`, where
* `snapshot` is the `AggregateQuerySnapshot` resulting from running the
* returned query.
*/
count(): AggregateQuery<{count: firestore.AggregateField<number>}> {
return new AggregateQuery(this, {count: {}});
}

/**
* Returns true if this `Query` is equal to the provided value.
*
Expand Down Expand Up @@ -2832,6 +2852,307 @@ export class CollectionReference<T = firestore.DocumentData>
}
}

/**
* A query that calculates aggregations over an underlying query.
*/
export class AggregateQuery<T extends firestore.AggregateSpec>
implements firestore.AggregateQuery<T>
{
/**
* @private
* @internal
*
* @param _query The query whose aggregations will be calculated by this
* object.
* @param _aggregates The aggregations that will be performed by this query.
*/
constructor(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly _query: Query<any>,
private readonly _aggregates: T
) {}

/** The query whose aggregations will be calculated by this object. */
get query(): firestore.Query<unknown> {
return this._query;
}

/**
* Executes this query.
*
* @return A promise that will be resolved with the results of the query.
*/
get(): Promise<AggregateQuerySnapshot<T>> {
return this._get();
}

/**
* Internal get() method that accepts an optional transaction id.
*
* @private
* @internal
* @param {bytes=} transactionId A transaction ID.
*/
_get(transactionId?: Uint8Array): Promise<AggregateQuerySnapshot<T>> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

return new Promise((resolve, reject) => {
const stream = this._stream(transactionId);
stream.on('error', err => {
reject(wrapError(err, stack));
});
stream.once('data', result => {
stream.destroy();
resolve(result);
});
stream.on('end', () => {
reject('No AggregateQuery results');
});
});
}

/**
* Internal streaming method that accepts an optional transaction ID.
*
* @private
* @internal
* @param transactionId A transaction ID.
* @returns A stream of document results.
*/
_stream(transactionId?: Uint8Array): Readable {
const tag = requestTag();
const firestore = this._query.firestore;

const stream: Transform = new Transform({
objectMode: true,
transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => {
if (proto.result) {
const readTime = Timestamp.fromProto(proto.readTime!);
const data = this.decodeResult(proto.result);
callback(
undefined,
new AggregateQuerySnapshot<T>(this, readTime, data)
);
} else {
callback(Error('RunAggregationQueryResponse is missing result'));
}
},
});

firestore
.initializeIfNeeded(tag)
.then(async () => {
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = this.toProto(transactionId);

let streamActive: Deferred<boolean>;
do {
streamActive = new Deferred<boolean>();
const backendStream = await firestore.requestStream(
'runAggregationQuery',
/* bidirectional= */ false,
request,
tag
);
stream.on('close', () => {
backendStream.resume();
backendStream.end();
});
backendStream.on('error', err => {
backendStream.unpipe(stream);
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (
!transactionId &&
!isPermanentRpcError(err, 'runAggregationQuery')
) {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with retryable stream error:',
err
);
streamActive.resolve(/* active= */ true);
} else {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with stream error:',
err
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
});
backendStream.resume();
backendStream.pipe(stream);
} while (await streamActive.promise);
})
.catch(e => stream.destroy(e));

return stream;
}

/**
* Internal method to decode values within result.
* @private
*/
private decodeResult(
proto: api.IAggregationResult
): firestore.AggregateSpecData<T> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const data: any = {};
const fields = proto.aggregateFields;
if (fields) {
const serializer = this._query.firestore._serializer!;
for (const prop of Object.keys(fields)) {
if (this._aggregates[prop] === undefined) {
throw new Error(
`Unexpected alias [${prop}] in result aggregate result`
);
}
data[prop] = serializer.decodeValue(fields[prop]);
}
}
return data;
}

/**
* Internal method for serializing a query to its RunAggregationQuery proto
* representation with an optional transaction id.
*
* @private
* @internal
* @returns Serialized JSON for the query.
*/
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
const queryProto = this._query.toProto();
//TODO(tomandersen) inspect _query to build request - this is just hard
// coded count right now.
const runQueryRequest: api.IRunAggregationQueryRequest = {
parent: queryProto.parent,
structuredAggregationQuery: {
structuredQuery: queryProto.structuredQuery,
aggregations: [
{
alias: 'count',
count: {},
},
],
},
};

if (transactionId instanceof Uint8Array) {
runQueryRequest.transaction = transactionId;
}

return runQueryRequest;
}

/**
* Compares this object with the given object for equality.
*
* This object is considered "equal" to the other object if and only if
* `other` performs the same aggregations as this `AggregateQuery` and
* the underlying Query of `other` compares equal to that of this object
* using `Query.isEqual()`.
*
* @param other The object to compare to this object for equality.
* @return `true` if this object is "equal" to the given object, as
* defined above, or `false` otherwise.
*/
isEqual(other: firestore.AggregateQuery<T>): boolean {
if (this === other) {
return true;
}
if (!(other instanceof AggregateQuery)) {
return false;
}
if (!this.query.isEqual(other.query)) {
return false;
}
return deepEqual(this._aggregates, other._aggregates);
}
}

/**
* The results of executing an aggregation query.
*/
export class AggregateQuerySnapshot<T extends firestore.AggregateSpec>
implements firestore.AggregateQuerySnapshot<T>
{
/**
* @private
* @internal
*
* @param _query The query that was executed to produce this result.
* @param _readTime The time this snapshot was read.
* @param _data The results of the aggregations performed over the underlying
* query.
*/
constructor(
private readonly _query: AggregateQuery<T>,
private readonly _readTime: Timestamp,
private readonly _data: firestore.AggregateSpecData<T>
) {}

/** The query that was executed to produce this result. */
get query(): firestore.AggregateQuery<T> {
return this._query;
}

/** The time this snapshot was read. */
get readTime(): firestore.Timestamp {
return this._readTime;
}

/**
* Returns the results of the aggregations performed over the underlying
* query.
*
* The keys of the returned object will be the same as those of the
* `AggregateSpec` object specified to the aggregation method, and the
* values will be the corresponding aggregation result.
*
* @returns The results of the aggregations performed over the underlying
* query.
*/
data(): firestore.AggregateSpecData<T> {
return this._data;
}

/**
* Compares this object with the given object for equality.
*
* Two `AggregateQuerySnapshot` instances are considered "equal" if they
* have the same data and their underlying queries compare "equal" using
* `AggregateQuery.isEqual()`.
*
* @param other The object to compare to this object for equality.
* @return `true` if this object is "equal" to the given object, as
* defined above, or `false` otherwise.
*/
isEqual(other: firestore.AggregateQuerySnapshot<T>): boolean {
if (this === other) {
return true;
}
if (!(other instanceof AggregateQuerySnapshot)) {
return false;
}
// Since the read time is different on every read, we explicitly ignore all
// document metadata in this comparison, just like
// `DocumentSnapshot.isEqual()` does.
if (!this.query.isEqual(other.query)) {
return false;
}

return deepEqual(this._data, other._data);
}
}

/**
* Validates the input string as a field order direction.
*
Expand Down