Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use REST #1698

Merged
merged 8 commits into from Sep 12, 2022
9 changes: 5 additions & 4 deletions dev/src/bulk-writer.ts
Expand Up @@ -16,7 +16,7 @@
import * as firestore from '@google-cloud/firestore';

import * as assert from 'assert';
import {GoogleError} from 'google-gax';
import type {GoogleError} from 'google-gax';

import {google} from '../protos/firestore_v1_proto_api';
import {FieldPath, Firestore} from '.';
Expand Down Expand Up @@ -285,9 +285,10 @@ class BulkCommitBatch extends WriteBatch {
);
this.pendingOps[i].onSuccess(new WriteResult(updateTime));
} else {
const error = new (require('google-gax').GoogleError)(
status.message || undefined
);
const error =
new (require('google-gax/build/src/fallback').GoogleError)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if instead of using this path to fallback, we can use a subpath export to the fallback? https://nodejs.org/api/packages.html#subpath-exports

Maybe it's already set up that way, I didn't look at gax source yet. But the path indicates maybe it is not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea (and "today I learned" as well).

We will be able to safely start using it when we drop Node 12 support. The documentation says subpath exports were added in v12.7.0 but we have "engines.node" set to ">=12" now, so we cannot possibly start using it without making a semver major. But for the next major I definitely want to do it. I filed googleapis/gax-nodejs#1337 so that we don't forget.

status.message || undefined
);
error.code = status.code as number;
this.pendingOps[i].onError(wrapError(error, stack));
}
Expand Down
1 change: 1 addition & 0 deletions dev/src/collection-group.ts
Expand Up @@ -96,6 +96,7 @@ export class CollectionGroup<T = firestore.DocumentData>

const stream = await this.firestore.requestStream(
'partitionQueryStream',
/* bidirectional= */ false,
request,
tag
);
Expand Down
1 change: 1 addition & 0 deletions dev/src/document-reader.ts
Expand Up @@ -112,6 +112,7 @@ export class DocumentReader<T> {
try {
const stream = await this.firestore.requestStream(
'batchGetDocuments',
/* bidirectional= */ false,
request,
requestTag
);
Expand Down
118 changes: 87 additions & 31 deletions dev/src/index.ts
Expand Up @@ -16,7 +16,9 @@

import * as firestore from '@google-cloud/firestore';

import {CallOptions} from 'google-gax';
import type {CallOptions} from 'google-gax';
import type * as googleGax from 'google-gax';
import type * as googleGaxFallback from 'google-gax/build/src/fallback';
import {Duplex, PassThrough, Transform} from 'stream';

import {URL} from 'url';
Expand Down Expand Up @@ -393,6 +395,16 @@ export class Firestore implements firestore.Firestore {
*/
private _clientPool: ClientPool<GapicClient>;

/**
* Preloaded instance of google-gax (full module, with gRPC support).
*/
private _gax?: typeof googleGax;

/**
* Preloaded instance of google-gax HTTP fallback implementation (no gRPC).
*/
private _gaxFallback?: typeof googleGaxFallback;

/**
* The configuration options for the GAPIC client.
* @private
Expand Down Expand Up @@ -534,19 +546,48 @@ export class Firestore implements firestore.Firestore {
this._clientPool = new ClientPool<GapicClient>(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
maxIdleChannels,
/* clientFactory= */ () => {
/* clientFactory= */ (requiresGrpc: boolean) => {
let client: GapicClient;

// Use the rest fallback if enabled and if the method does not require GRPC
const useFallback =
!this._settings.preferRest || requiresGrpc ? false : 'rest';

let gax: typeof googleGax | typeof googleGaxFallback;
if (useFallback) {
if (!this._gaxFallback) {
gax = this._gaxFallback = require('google-gax/build/src/fallback');
} else {
gax = this._gaxFallback;
}
} else {
if (!this._gax) {
gax = this._gax = require('google-gax');
} else {
gax = this._gax;
}
}

if (this._settings.ssl === false) {
const grpcModule = this._settings.grpc ?? require('google-gax').grpc;
const sslCreds = grpcModule.credentials.createInsecure();

client = new module.exports.v1({
sslCreds,
...this._settings,
});
client = new module.exports.v1(
{
sslCreds,
...this._settings,
fallback: useFallback,
},
gax
);
} else {
client = new module.exports.v1(this._settings);
client = new module.exports.v1(
{
...this._settings,
fallback: useFallback,
},
gax
);
}

logger('Firestore', null, 'Initialized Firestore GAPIC Client');
Expand Down Expand Up @@ -1379,8 +1420,10 @@ export class Firestore implements firestore.Firestore {

if (this._projectId === undefined) {
try {
this._projectId = await this._clientPool.run(requestTag, gapicClient =>
gapicClient.getProjectId()
this._projectId = await this._clientPool.run(
requestTag,
/* requiresGrpc= */ false,
gapicClient => gapicClient.getProjectId()
);
logger(
'Firestore.initializeIfNeeded',
Expand Down Expand Up @@ -1421,10 +1464,11 @@ export class Firestore implements firestore.Firestore {

if (retryCodes) {
const retryParams = getRetryParams(methodName);
callOptions.retry = new (require('google-gax').RetryOptions)(
retryCodes,
retryParams
);
callOptions.retry =
new (require('google-gax/build/src/fallback').RetryOptions)(
retryCodes,
retryParams
);
}

return callOptions;
Expand Down Expand Up @@ -1627,24 +1671,33 @@ export class Firestore implements firestore.Firestore {
): Promise<Resp> {
const callOptions = this.createCallOptions(methodName, retryCodes);

return this._clientPool.run(requestTag, async gapicClient => {
try {
logger('Firestore.request', requestTag, 'Sending request: %j', request);
const [result] = await (
gapicClient[methodName] as UnaryMethod<Req, Resp>
)(request, callOptions);
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
return result;
} catch (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
return Promise.reject(err);
return this._clientPool.run(
requestTag,
/* requiresGrpc= */ false,
async gapicClient => {
try {
logger(
'Firestore.request',
requestTag,
'Sending request: %j',
request
);
const [result] = await (
gapicClient[methodName] as UnaryMethod<Req, Resp>
)(request, callOptions);
logger(
'Firestore.request',
requestTag,
'Received response: %j',
result
);
return result;
} catch (err) {
logger('Firestore.request', requestTag, 'Received error:', err);
return Promise.reject(err);
}
}
});
);
}

/**
Expand All @@ -1658,12 +1711,15 @@ export class Firestore implements firestore.Firestore {
* @internal
* @param methodName Name of the streaming Veneer API endpoint that
* takes a request and GAX options.
* @param bidrectional Whether the request is bidirectional (true) or
* unidirectional (false_
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @returns A Promise with the resulting read-only stream.
*/
requestStream(
methodName: FirestoreStreamingMethod,
bidrectional: boolean,
request: {},
requestTag: string
): Promise<Duplex> {
Expand All @@ -1674,7 +1730,7 @@ export class Firestore implements firestore.Firestore {
return this._retry(methodName, requestTag, () => {
const result = new Deferred<Duplex>();

this._clientPool.run(requestTag, async gapicClient => {
this._clientPool.run(requestTag, bidrectional, async gapicClient => {
logger(
'Firestore.requestStream',
requestTag,
Expand Down
66 changes: 46 additions & 20 deletions dev/src/pool.ts
Expand Up @@ -35,10 +35,15 @@ export const CLIENT_TERMINATED_ERROR_MSG =
* @internal
*/
export class ClientPool<T> {
private grpcEnabled = false;

/**
* Stores each active clients and how many operations it has outstanding.
*/
private activeClients = new Map<T, number>();
private activeClients = new Map<
T,
{activeRequestCount: number; grpcEnabled: boolean}
>();

/**
* A set of clients that have seen RST_STREAM errors (see
Expand Down Expand Up @@ -72,7 +77,7 @@ export class ClientPool<T> {
constructor(
private readonly concurrentOperationLimit: number,
private readonly maxIdleClients: number,
private readonly clientFactory: () => T,
private readonly clientFactory: (requiresGrpc: boolean) => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
) {}
Expand All @@ -84,21 +89,22 @@ export class ClientPool<T> {
* @private
* @internal
*/
private acquire(requestTag: string): T {
private acquire(requestTag: string, requiresGrpc: boolean): T {
let selectedClient: T | null = null;
let selectedClientRequestCount = -1;

for (const [client, requestCount] of this.activeClients) {
for (const [client, metadata] of this.activeClients) {
// Use the "most-full" client that can still accommodate the request
// in order to maximize the number of idle clients as operations start to
// complete.
if (
!this.failedClients.has(client) &&
requestCount > selectedClientRequestCount &&
requestCount < this.concurrentOperationLimit
metadata.activeRequestCount > selectedClientRequestCount &&
metadata.activeRequestCount < this.concurrentOperationLimit &&
(!requiresGrpc || metadata.grpcEnabled)
) {
selectedClient = client;
selectedClientRequestCount = requestCount;
selectedClientRequestCount = metadata.activeRequestCount;
}
}

Expand All @@ -111,15 +117,18 @@ export class ClientPool<T> {
);
} else {
logger('ClientPool.acquire', requestTag, 'Creating a new client');
selectedClient = this.clientFactory();
selectedClient = this.clientFactory(requiresGrpc);
selectedClientRequestCount = 0;
assert(
!this.activeClients.has(selectedClient),
'The provided client factory returned an existing instance'
);
}

this.activeClients.set(selectedClient, selectedClientRequestCount + 1);
this.activeClients.set(selectedClient, {
grpcEnabled: requiresGrpc,
activeRequestCount: selectedClientRequestCount + 1,
});

return selectedClient!;
}
Expand All @@ -131,9 +140,12 @@ export class ClientPool<T> {
* @internal
*/
private async release(requestTag: string, client: T): Promise<void> {
const requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active requests');
this.activeClients.set(client, requestCount - 1);
const metadata = this.activeClients.get(client);
assert(metadata && metadata.activeRequestCount > 0, 'No active requests');
this.activeClients.set(client, {
grpcEnabled: metadata.grpcEnabled,
activeRequestCount: metadata.activeRequestCount - 1,
});
if (this.terminated && this.opCount === 0) {
this.terminateDeferred.resolve();
}
Expand All @@ -153,22 +165,30 @@ export class ClientPool<T> {
* @internal
*/
private shouldGarbageCollectClient(client: T): boolean {
// Don't garbage collect clients that have active requests.
if (this.activeClients.get(client) !== 0) {
const clientMetadata = this.activeClients.get(client)!;

if (clientMetadata.activeRequestCount !== 0) {
// Don't garbage collect clients that have active requests.
return false;
}

if (this.grpcEnabled !== clientMetadata.grpcEnabled) {
// We are transitioning to GRPC. Garbage collect REST clients.
return true;
}

// Idle clients that have received RST_STREAM errors are always garbage
// collected.
if (this.failedClients.has(client)) {
return true;
}

// Otherwise, only garbage collect if we have too much idle capacity (e.g.
// more than 100 idle capacity with default settings) .
// more than 100 idle capacity with default settings).
let idleCapacityCount = 0;
for (const [, count] of this.activeClients) {
idleCapacityCount += this.concurrentOperationLimit - count;
for (const [, metadata] of this.activeClients) {
idleCapacityCount +=
this.concurrentOperationLimit - metadata.activeRequestCount;
}
return (
idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit
Expand Down Expand Up @@ -197,7 +217,9 @@ export class ClientPool<T> {
// Visible for testing.
get opCount(): number {
let activeOperationCount = 0;
this.activeClients.forEach(count => (activeOperationCount += count));
this.activeClients.forEach(
metadata => (activeOperationCount += metadata.activeRequestCount)
);
return activeOperationCount;
}

Expand All @@ -213,11 +235,15 @@ export class ClientPool<T> {
* @private
* @internal
*/
run<V>(requestTag: string, op: (client: T) => Promise<V>): Promise<V> {
run<V>(
requestTag: string,
requiresGrpc: boolean,
op: (client: T) => Promise<V>
): Promise<V> {
if (this.terminated) {
return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG));
}
const client = this.acquire(requestTag);
const client = this.acquire(requestTag, requiresGrpc);

return op(client)
.catch(async (err: GoogleError) => {
Expand Down