Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: COUNT Queries #1774

Merged
merged 33 commits into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f70e121
firestore.d.ts: COUNT API added
dconeybe Jun 20, 2022
76cd8e7
Work in progress
tom-andersen Sep 2, 2022
d9f5170
Implement count with tests.
tom-andersen Sep 12, 2022
6ddf4e9
Fix tests with better termination
tom-andersen Sep 14, 2022
eb422f9
Copy Denvers API changes
tom-andersen Sep 14, 2022
0d936e5
Implement COUNT API changes.
tom-andersen Sep 15, 2022
b1747cc
Add comments
tom-andersen Sep 15, 2022
70b5659
Fix linting errors
tom-andersen Sep 15, 2022
a06f110
Revert manual proto change
tom-andersen Sep 16, 2022
0ecf768
Add comments
tom-andersen Sep 16, 2022
f396bda
Fix types
tom-andersen Sep 16, 2022
349ca1e
Implement retry
tom-andersen Sep 16, 2022
f162f12
Pass transaction
tom-andersen Sep 16, 2022
63fa8d8
Comment out test
tom-andersen Sep 16, 2022
8871496
Fix test
tom-andersen Sep 16, 2022
12aaf16
Remove demo file
tom-andersen Sep 16, 2022
999b910
Run integration test
tom-andersen Sep 16, 2022
4d2ee83
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
401b843
Remove future aggregates
tom-andersen Sep 19, 2022
63ed8bb
Fix emulator
tom-andersen Sep 19, 2022
e21e3f9
Merge branch 'tomandersen/count' of https://github.com/googleapis/nod…
tom-andersen Sep 19, 2022
c58bad7
Enable test
tom-andersen Sep 19, 2022
dc8c3a5
Prettier
tom-andersen Sep 19, 2022
dbf9d1a
Cleanup
tom-andersen Sep 20, 2022
046399c
Add tests
tom-andersen Sep 20, 2022
c519991
Changes from PR review
tom-andersen Sep 20, 2022
d12c82a
Fix PR based on comments
tom-andersen Sep 22, 2022
80e5044
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 23, 2022
54b8bd7
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 29, 2022
daf127e
Merge remote-tracking branch 'origin/main' into tomandersen/count
dconeybe Sep 30, 2022
e3136b8
system-test/firestore.ts: remove unconditional call to setLogFunction…
dconeybe Sep 30, 2022
996d86e
docs: Write javadocs for COUNT API (#1783)
dconeybe Oct 3, 2022
38bc42b
Merge branch 'main' into tomandersen/count
tom-andersen Oct 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
296 changes: 294 additions & 2 deletions dev/src/reference.ts
Expand Up @@ -15,7 +15,7 @@
*/

import * as firestore from '@google-cloud/firestore';
import {Duplex, Transform} from 'stream';
import {Duplex, Readable, Transform} from 'stream';
import * as deepEqual from 'fast-deep-equal';

import * as protos from '../protos/firestore_v1_proto_api';
Expand Down Expand Up @@ -55,7 +55,6 @@ import {
} from './validate';
import {DocumentWatch, QueryWatch} from './watch';
import {validateDocumentData, WriteBatch, WriteResult} from './write-batch';

import api = protos.google.firestore.v1;

/**
Expand Down Expand Up @@ -1599,6 +1598,29 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
return new Query(this._firestore, options);
}

/**
* Returns an `AggregateQuery` that counts the number of documents in the
* result set.
*
* @return an `AggregateQuery` that counts the number of documents in the
* result set.
*/
count(): AggregateQuery<{count: firestore.AggregateField<number>}> {
return this._aggregate({count: AggregateField.count()});
}

/**
* Returns an `AggregateQuery` that performs the given aggregations.
*
* @param aggregates the aggregations to perform.
* @return an `AggregateQuery` that performs the given aggregations.
*/
private _aggregate<T extends firestore.AggregateSpec>(
aggregates: T
): AggregateQuery<T> {
return new AggregateQuery(this, aggregates);
}

/**
* Returns true if this `Query` is equal to the provided value.
*
Expand Down Expand Up @@ -2832,6 +2854,276 @@ export class CollectionReference<T = firestore.DocumentData>
}
}

export class AggregateField<T> implements firestore.AggregateField<T> {
private constructor() {}

static count(): AggregateField<number> {
return new AggregateField<number>();
}

isEqual(other: firestore.AggregateField<T>): boolean {
return this === other || other instanceof AggregateField;
ehsannas marked this conversation as resolved.
Show resolved Hide resolved
}
}

export class AggregateQuery<T extends firestore.AggregateSpec>
implements firestore.AggregateQuery<T>
{
private readonly _query: Query<unknown>;
private readonly _aggregates: T;

constructor(query: Query<any>, aggregates: T) {
this._query = query;
this._aggregates = aggregates;
}

get query(): firestore.Query<unknown> {
return this._query;
}

get(): Promise<AggregateQuerySnapshot<T>> {
return this._get();
}

/**
* Internal get() method that accepts an optional transaction id.
*
* @private
* @internal
* @param {bytes=} transactionId A transaction ID.
*/
_get(transactionId?: Uint8Array): Promise<AggregateQuerySnapshot<T>> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

return new Promise((resolve, reject) => {
const stream = this._stream(transactionId);
stream.on('error', err => {
reject(wrapError(err, stack));
});
stream.once('data', result => {
stream.destroy();
resolve(result);
});
stream.on('end', () => {
reject('No AggregateQuery results');
});
});
}

/**
* Internal streaming method that accepts an optional transaction ID.
*
* @param transactionId A transaction ID.
* @private
* @internal
* @returns A stream of document results.
*/
_stream(transactionId?: Uint8Array): Readable {
const tag = requestTag();
const firestore = this._query.firestore;

const stream: Transform = new Transform({
objectMode: true,
transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => {
if (proto.result) {
const readTime = Timestamp.fromProto(proto.readTime!);
const data = this.decodeResult(proto.result);
callback(
undefined,
new AggregateQuerySnapshot<T>(this, readTime, data)
);
} else {
callback(Error('RunAggregationQueryResponse is missing result'));
}
},
});

firestore
.initializeIfNeeded(tag)
.then(async () => {
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = this.toProto(transactionId);

let streamActive: Deferred<boolean>;
do {
streamActive = new Deferred<boolean>();
const backendStream = await firestore.requestStream(
'runAggregationQuery',
/* bidirectional= */ false,
request,
tag
);
stream.on('close', () => {
backendStream.resume();
backendStream.end();
});
backendStream.on('error', err => {
backendStream.unpipe(stream);
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (
!transactionId &&
!isPermanentRpcError(err, 'runAggregationQuery')
) {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with retryable stream error:',
err
);
streamActive.resolve(/* active= */ true);
} else {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with stream error:',
err
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
});
backendStream.resume();
backendStream.pipe(stream);
} while (await streamActive.promise);
})
.catch(e => stream.destroy(e));

return stream;
}

/**
* Internal method to decode values within result.
*
* @param proto
* @private
*/
private decodeResult(
proto: api.IAggregationResult
): firestore.AggregateSpecData<T> {
const data: any = {};
const fields = proto.aggregateFields;
if (fields) {
const serializer = this._query.firestore._serializer!;
for (const prop of Object.keys(fields)) {
if (this._aggregates[prop] === undefined) {
throw new Error(
`Unexpected alias [${prop}] in result aggregate result`
);
}
data[prop] = serializer.decodeValue(fields[prop]);
}
}
return data;
}

/**
* Internal method for serializing a query to its RunAggregationQuery proto
* representation with an optional transaction id.
*
* @private
* @internal
* @returns Serialized JSON for the query.
*/
toProto(transactionId?: Uint8Array): api.IRunAggregationQueryRequest {
const queryProto = this._query.toProto();
//TODO(tomandersen) inspect _query to build request - this is just hard coded count right now.
const runQueryRequest: api.IRunAggregationQueryRequest = {
parent: queryProto.parent,
structuredAggregationQuery: {
structuredQuery: queryProto.structuredQuery,
aggregations: [
{
alias: 'count',
count: {},
},
],
},
};

if (transactionId instanceof Uint8Array) {
runQueryRequest.transaction = transactionId;
}

return runQueryRequest;
}

isEqual(other: firestore.AggregateQuery<T>): boolean {
if (this === other) {
return true;
}
if (other instanceof AggregateQuery) {
if (!this._query.isEqual(other._query)) {
return false;
}

const thisAggregates: [string, AggregateField<unknown>][] =
Object.entries(this._aggregates);
const otherAggregates = other._aggregates;

return (
thisAggregates.length === Object.keys(otherAggregates).length &&
thisAggregates.every(([alias, field]) =>
field.isEqual(otherAggregates[alias])
)
);
}
return false;
}
}

export class AggregateQuerySnapshot<T extends firestore.AggregateSpec>
implements firestore.AggregateQuerySnapshot<T>
{
constructor(
private readonly _query: AggregateQuery<T>,
private readonly _readTime: Timestamp,
private readonly _data: firestore.AggregateSpecData<T>
) {}

get query(): firestore.AggregateQuery<T> {
return this._query;
}

get readTime(): firestore.Timestamp {
return this._readTime;
}

isEqual(other: firestore.AggregateQuerySnapshot<T>): boolean {
if (this === other) {
return true;
}
if (other instanceof AggregateQuerySnapshot) {
if (!this._query.isEqual(other._query)) {
return false;
}

const thisData = this._data;
const thisDataKeys: string[] = Object.keys(thisData);

const otherData = other._data;
const otherDataKeys: string[] = Object.keys(otherData);

return (
thisDataKeys.length === otherDataKeys.length &&
thisDataKeys.every(
alias =>
Object.prototype.hasOwnProperty.call(otherData, alias) &&
thisData[alias] === otherData[alias]
)
);
}
return false;
}

data(): firestore.AggregateSpecData<T> {
return this._data;
}
}

/**
* Validates the input string as a field order direction.
*
Expand Down
30 changes: 26 additions & 4 deletions dev/src/transaction.ts
Expand Up @@ -27,6 +27,8 @@ import {logger} from './logger';
import {FieldPath, validateFieldPath} from './path';
import {StatusCode} from './status-code';
import {
AggregateQuery,
AggregateQuerySnapshot,
DocumentReference,
Query,
QuerySnapshot,
Expand Down Expand Up @@ -97,6 +99,17 @@ export class Transaction implements firestore.Transaction {
*/
get<T>(documentRef: DocumentReference<T>): Promise<DocumentSnapshot<T>>;

/**
* Retrieves an aggregate query result. Holds a pessimistic lock on all
* documents that were matched by the underlying query.
*
* @param aggregateQuery An aggregate query to execute.
* @return An AggregateQuerySnapshot for the retrieved data.
*/
get<T extends firestore.AggregateSpec>(
aggregateQuery: firestore.AggregateQuery<T>
): Promise<AggregateQuerySnapshot<T>>;

/**
* Retrieve a document or a query result from the database. Holds a
* pessimistic lock on all returned documents.
Expand All @@ -120,9 +133,14 @@ export class Transaction implements firestore.Transaction {
* });
* ```
*/
get<T>(
refOrQuery: DocumentReference<T> | Query<T>
): Promise<DocumentSnapshot<T> | QuerySnapshot<T>> {
get<T, U extends firestore.AggregateSpec>(
refOrQuery:
| firestore.DocumentReference<T>
| firestore.Query<T>
| firestore.AggregateQuery<U>
): Promise<
DocumentSnapshot<T> | QuerySnapshot<T> | AggregateQuerySnapshot<U>
> {
if (!this._writeBatch.isEmpty) {
throw new Error(READ_AFTER_WRITE_ERROR_MSG);
}
Expand All @@ -137,8 +155,12 @@ export class Transaction implements firestore.Transaction {
return refOrQuery._get(this._transactionId);
}

if (refOrQuery instanceof AggregateQuery<U>) {
return refOrQuery._get(this._transactionId);
}

throw new Error(
'Value for argument "refOrQuery" must be a DocumentReference or a Query.'
'Value for argument "refOrQuery" must be a DocumentReference, Query, or AggregateQuery.'
);
}

Expand Down