Skip to content

Commit

Permalink
feat: Use REST (#1698)
Browse files Browse the repository at this point in the history
To use REST transport when possible, pass `{preferRest: true}` to the constructor:

```ts
const db = new firestore.Firestore({preferRest: true});
```
  • Loading branch information
schmidt-sebastian committed Sep 12, 2022
1 parent 6ba6751 commit d85b0e9
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 134 deletions.
9 changes: 5 additions & 4 deletions dev/src/bulk-writer.ts
Expand Up @@ -16,7 +16,7 @@
import * as firestore from '@google-cloud/firestore';

import * as assert from 'assert';
import {GoogleError} from 'google-gax';
import type {GoogleError} from 'google-gax';

import {google} from '../protos/firestore_v1_proto_api';
import {FieldPath, Firestore} from '.';
Expand Down Expand Up @@ -285,9 +285,10 @@ class BulkCommitBatch extends WriteBatch {
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error = new (require('google-gax').GoogleError)(
status.message || undefined
);
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
status.message || undefined
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
}
Expand Down
1 change: 1 addition & 0 deletions dev/src/collection-group.ts
Expand Up @@ -96,6 +96,7 @@ export class CollectionGroup<T = firestore.DocumentData>

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
Expand Down
1 change: 1 addition & 0 deletions dev/src/document-reader.ts
Expand Up @@ -112,6 +112,7 @@ export class DocumentReader<T> {
try {
const stream = await this.firestore.requestStream(
'batchGetDocuments',
/* bidirectional= */ false,
request,
requestTag
);
Expand Down
118 changes: 87 additions & 31 deletions dev/src/index.ts
Expand Up @@ -16,7 +16,9 @@

import * as firestore from '@google-cloud/firestore';

import {CallOptions} from 'google-gax';
import type {CallOptions} from 'google-gax';
import type * as googleGax from 'google-gax';
import type * as googleGaxFallback from 'google-gax/build/src/fallback';
import {Duplex, PassThrough, Transform} from 'stream';

import {URL} from 'url';
Expand Down Expand Up @@ -393,6 +395,16 @@ export class Firestore implements firestore.Firestore {
*/
private _clientPool: ClientPool<GapicClient>;

/**
* Preloaded instance of google-gax (full module, with gRPC support).
*/
private _gax?: typeof googleGax;

/**
* Preloaded instance of google-gax HTTP fallback implementation (no gRPC).
*/
private _gaxFallback?: typeof googleGaxFallback;

/**
* The configuration options for the GAPIC client.
* @private
Expand Down Expand Up @@ -534,19 +546,48 @@ export class Firestore implements firestore.Firestore {
this._clientPool = new ClientPool<GapicClient>(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
/* clientFactory= */ (requiresGrpc: boolean) => {
let client: GapicClient;

// Use the rest fallback if enabled and if the method does not require GRPC
const useFallback =
!this._settings.preferRest || requiresGrpc ? false : 'rest';

let gax: typeof googleGax | typeof googleGaxFallback;
if (useFallback) {
if (!this._gaxFallback) {
gax = this._gaxFallback = require('google-gax/build/src/fallback');
} else {
gax = this._gaxFallback;
}
} else {
if (!this._gax) {
gax = this._gax = require('google-gax');
} else {
gax = this._gax;
}
}

if (this._settings.ssl === false) {
const grpcModule = this._settings.grpc ?? require('google-gax').grpc;
const sslCreds = grpcModule.credentials.createInsecure();

client = new module.exports.v1({
sslCreds,
...this._settings,
});
client = new module.exports.v1(
{
sslCreds,
...this._settings,
fallback: useFallback,
},
gax
);
} else {
client = new module.exports.v1(this._settings);
client = new module.exports.v1(
{
...this._settings,
fallback: useFallback,
},
gax
);
}

logger('Firestore', null, 'Initialized Firestore GAPIC Client');
Expand Down Expand Up @@ -1379,8 +1420,10 @@ export class Firestore implements firestore.Firestore {

if (this._projectId === undefined) {
try {
this._projectId = await this._clientPool.run(requestTag, gapicClient =>
gapicClient.getProjectId()
this._projectId = await this._clientPool.run(
requestTag,
/* requiresGrpc= */ false,
gapicClient => gapicClient.getProjectId()
);
logger(
'Firestore.initializeIfNeeded',
Expand Down Expand Up @@ -1421,10 +1464,11 @@ export class Firestore implements firestore.Firestore {

if (retryCodes) {
const retryParams = getRetryParams(methodName);
callOptions.retry = new (require('google-gax').RetryOptions)(
retryCodes,
retryParams
);
callOptions.retry =
new (require('google-gax/build/src/fallback').RetryOptions)(
retryCodes,
retryParams
);
}

return callOptions;
Expand Down Expand Up @@ -1627,24 +1671,33 @@ export class Firestore implements firestore.Firestore {
): Promise<Resp> {
const callOptions = this.createCallOptions(methodName, retryCodes);

return this._clientPool.run(requestTag, async gapicClient => {
try {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
const [result] = await (
gapicClient[methodName] as UnaryMethod<Req, Resp>
)(request, callOptions);
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
return result;
} catch (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
return Promise.reject(err);
return this._clientPool.run(
requestTag,
/* requiresGrpc= */ false,
async gapicClient => {
try {
logger(
'Firestore.request',
requestTag,
'Sending request: %j',
request
);
const [result] = await (
gapicClient[methodName] as UnaryMethod<Req, Resp>
)(request, callOptions);
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
return result;
} catch (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
return Promise.reject(err);
}
}
});
);
}

/**
Expand All @@ -1658,12 +1711,15 @@ export class Firestore implements firestore.Firestore {
* @internal
* @param methodName Name of the streaming Veneer API endpoint that
* takes a request and GAX options.
* @param bidrectional Whether the request is bidirectional (true) or
* unidirectional (false_
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @returns A Promise with the resulting read-only stream.
*/
requestStream(
methodName: FirestoreStreamingMethod,
bidrectional: boolean,
request: {},
requestTag: string
): Promise<Duplex> {
Expand All @@ -1674,7 +1730,7 @@ export class Firestore implements firestore.Firestore {
return this._retry(methodName, requestTag, () => {
const result = new Deferred<Duplex>();

this._clientPool.run(requestTag, async gapicClient => {
this._clientPool.run(requestTag, bidrectional, async gapicClient => {
logger(
'Firestore.requestStream',
requestTag,
Expand Down
66 changes: 46 additions & 20 deletions dev/src/pool.ts
Expand Up @@ -35,10 +35,15 @@ export const CLIENT_TERMINATED_ERROR_MSG =
* @internal
*/
export class ClientPool<T> {
private grpcEnabled = false;

/**
* Stores each active clients and how many operations it has outstanding.
*/
private activeClients = new Map<T, number>();
private activeClients = new Map<
T,
{activeRequestCount: number; grpcEnabled: boolean}
>();

/**
* A set of clients that have seen RST_STREAM errors (see
Expand Down Expand Up @@ -72,7 +77,7 @@ export class ClientPool<T> {
constructor(
private readonly concurrentOperationLimit: number,
private readonly maxIdleClients: number,
private readonly clientFactory: () => T,
private readonly clientFactory: (requiresGrpc: boolean) => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
) {}
Expand All @@ -84,21 +89,22 @@ export class ClientPool<T> {
* @private
* @internal
*/
private acquire(requestTag: string): T {
private acquire(requestTag: string, requiresGrpc: boolean): T {
let selectedClient: T | null = null;
let selectedClientRequestCount = -1;

for (const [client, requestCount] of this.activeClients) {
for (const [client, metadata] of this.activeClients) {
// Use the "most-full" client that can still accommodate the request
// in order to maximize the number of idle clients as operations start to
// complete.
if (
!this.failedClients.has(client) &&
requestCount > selectedClientRequestCount &&
requestCount < this.concurrentOperationLimit
metadata.activeRequestCount > selectedClientRequestCount &&
metadata.activeRequestCount < this.concurrentOperationLimit &&
(!requiresGrpc || metadata.grpcEnabled)
) {
selectedClient = client;
selectedClientRequestCount = requestCount;
selectedClientRequestCount = metadata.activeRequestCount;
}
}

Expand All @@ -111,15 +117,18 @@ export class ClientPool<T> {
);
} else {
logger('ClientPool.acquire', requestTag, 'Creating a new client');
selectedClient = this.clientFactory();
selectedClient = this.clientFactory(requiresGrpc);
selectedClientRequestCount = 0;
assert(
!this.activeClients.has(selectedClient),
'The provided client factory returned an existing instance'
);
}

this.activeClients.set(selectedClient, selectedClientRequestCount + 1);
this.activeClients.set(selectedClient, {
grpcEnabled: requiresGrpc,
activeRequestCount: selectedClientRequestCount + 1,
});

return selectedClient!;
}
Expand All @@ -131,9 +140,12 @@ export class ClientPool<T> {
* @internal
*/
private async release(requestTag: string, client: T): Promise<void> {
const requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active requests');
this.activeClients.set(client, requestCount - 1);
const metadata = this.activeClients.get(client);
assert(metadata && metadata.activeRequestCount > 0, 'No active requests');
this.activeClients.set(client, {
grpcEnabled: metadata.grpcEnabled,
activeRequestCount: metadata.activeRequestCount - 1,
});
if (this.terminated && this.opCount === 0) {
this.terminateDeferred.resolve();
}
Expand All @@ -153,22 +165,30 @@ export class ClientPool<T> {
* @internal
*/
private shouldGarbageCollectClient(client: T): boolean {
// Don't garbage collect clients that have active requests.
if (this.activeClients.get(client) !== 0) {
const clientMetadata = this.activeClients.get(client)!;

if (clientMetadata.activeRequestCount !== 0) {
// Don't garbage collect clients that have active requests.
return false;
}

if (this.grpcEnabled !== clientMetadata.grpcEnabled) {
// We are transitioning to GRPC. Garbage collect REST clients.
return true;
}

// Idle clients that have received RST_STREAM errors are always garbage
// collected.
if (this.failedClients.has(client)) {
return true;
}

// Otherwise, only garbage collect if we have too much idle capacity (e.g.
// more than 100 idle capacity with default settings) .
// more than 100 idle capacity with default settings).
let idleCapacityCount = 0;
for (const [, count] of this.activeClients) {
idleCapacityCount += this.concurrentOperationLimit - count;
for (const [, metadata] of this.activeClients) {
idleCapacityCount +=
this.concurrentOperationLimit - metadata.activeRequestCount;
}
return (
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
Expand Down Expand Up @@ -197,7 +217,9 @@ export class ClientPool<T> {
// Visible for testing.
get opCount(): number {
let activeOperationCount = 0;
this.activeClients.forEach(count => (activeOperationCount += count));
this.activeClients.forEach(
metadata => (activeOperationCount += metadata.activeRequestCount)
);
return activeOperationCount;
}

Expand All @@ -213,11 +235,15 @@ export class ClientPool<T> {
* @private
* @internal
*/
run<V>(requestTag: string, op: (client: T) => Promise<V>): Promise<V> {
run<V>(
requestTag: string,
requiresGrpc: boolean,
op: (client: T) => Promise<V>
): Promise<V> {
if (this.terminated) {
return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG));
}
const client = this.acquire(requestTag);
const client = this.acquire(requestTag, requiresGrpc);

return op(client)
.catch(async (err: GoogleError) => {
Expand Down

0 comments on commit d85b0e9

Please sign in to comment.