Skip to content

Commit

Permalink
Firestore: Fix spurious "Backend didn't respond within 10 seconds" er…
Browse files Browse the repository at this point in the history
…rors when network just slow (#8145)
  • Loading branch information
dconeybe committed Apr 18, 2024
1 parent 84f9ff0 commit 2244194
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changeset/early-tomatoes-occur.md
@@ -0,0 +1,6 @@
---
'@firebase/firestore': patch
'firebase': patch
---

Prevent spurious "Backend didn't respond within 10 seconds" errors when network is indeed responding, just slowly.
Expand Up @@ -306,6 +306,7 @@ export class WebChannelConnection extends RestConnection {
LOG_TAG,
`RPC '${rpcName}' stream ${streamId} transport opened.`
);
streamBridge.callOnConnected();
}
});

Expand Down
6 changes: 6 additions & 0 deletions packages/firestore/src/platform/node/grpc_connection.ts
Expand Up @@ -286,9 +286,15 @@ export class GrpcConnection implements Connection {
}
});

let onConnectedSent = false;
grpcStream.on('data', (msg: Resp) => {
if (!closed) {
logDebug(LOG_TAG, `RPC '${rpcName}' stream ${streamId} received:`, msg);
// Emulate the "onConnected" event that WebChannelConnection sends.
if (!onConnectedSent) {
stream.callOnConnected();
onConnectedSent = true;
}
stream.callOnMessage(msg);
}
});
Expand Down
8 changes: 6 additions & 2 deletions packages/firestore/src/remote/connection.ts
Expand Up @@ -109,10 +109,14 @@ export interface Connection {
* A bidirectional stream that can be used to send an receive messages.
*
* A stream can be closed locally with close() or can be closed remotely or
* through network errors. onClose is guaranteed to be called. onOpen will only
* be called if the stream successfully established a connection.
* through network errors. onClose is guaranteed to be called. onOpen will be
* called once the stream is ready to send messages (which may or may not be
* before an actual connection to the backend has been established). The
* onConnected event is called when an actual, physical connection with the
* backend has been established, and may occur before or after the onOpen event.
*/
export interface Stream<I, O> {
onConnected(callback: () => void): void;
onOpen(callback: () => void): void;
onClose(callback: (err?: FirestoreError) => void): void;
onMessage(callback: (msg: O) => void): void;
Expand Down
8 changes: 8 additions & 0 deletions packages/firestore/src/remote/persistent_stream.ts
Expand Up @@ -125,6 +125,11 @@ const enum PersistentStreamState {
* events by the concrete implementation classes.
*/
export interface PersistentStreamListener {
/**
* Called after receiving an acknowledgement from the server, confirming that
* we are able to connect to it.
*/
onConnected: () => Promise<void>;
/**
* Called after the stream was established and can accept outgoing
* messages
Expand Down Expand Up @@ -483,6 +488,9 @@ export abstract class PersistentStream<
const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount);

this.stream = this.startRpc(authToken, appCheckToken);
this.stream.onConnected(() => {
dispatchIfNotClosed(() => this.listener!.onConnected());
});
this.stream.onOpen(() => {
dispatchIfNotClosed(() => {
debugAssert(
Expand Down
9 changes: 9 additions & 0 deletions packages/firestore/src/remote/remote_store.ts
Expand Up @@ -403,6 +403,13 @@ function cleanUpWatchStreamState(remoteStoreImpl: RemoteStoreImpl): void {
remoteStoreImpl.watchChangeAggregator = undefined;
}

async function onWatchStreamConnected(
remoteStoreImpl: RemoteStoreImpl
): Promise<void> {
// Mark the client as online since we got a "connected" notification.
remoteStoreImpl.onlineStateTracker.set(OnlineState.Online);
}

async function onWatchStreamOpen(
remoteStoreImpl: RemoteStoreImpl
): Promise<void> {
Expand Down Expand Up @@ -923,6 +930,7 @@ function ensureWatchStream(
remoteStoreImpl.datastore,
remoteStoreImpl.asyncQueue,
{
onConnected: onWatchStreamConnected.bind(null, remoteStoreImpl),
onOpen: onWatchStreamOpen.bind(null, remoteStoreImpl),
onClose: onWatchStreamClose.bind(null, remoteStoreImpl),
onWatchChange: onWatchStreamChange.bind(null, remoteStoreImpl)
Expand Down Expand Up @@ -969,6 +977,7 @@ function ensureWriteStream(
remoteStoreImpl.datastore,
remoteStoreImpl.asyncQueue,
{
onConnected: () => Promise.resolve(),
onOpen: onWriteStreamOpen.bind(null, remoteStoreImpl),
onClose: onWriteStreamClose.bind(null, remoteStoreImpl),
onHandshakeComplete: onWriteHandshakeComplete.bind(
Expand Down
17 changes: 17 additions & 0 deletions packages/firestore/src/remote/stream_bridge.ts
Expand Up @@ -26,6 +26,7 @@ import { Stream } from './connection';
* interface. The stream callbacks are invoked with the callOn... methods.
*/
export class StreamBridge<I, O> implements Stream<I, O> {
private wrappedOnConnected: (() => void) | undefined;
private wrappedOnOpen: (() => void) | undefined;
private wrappedOnClose: ((err?: FirestoreError) => void) | undefined;
private wrappedOnMessage: ((msg: O) => void) | undefined;
Expand All @@ -38,6 +39,14 @@ export class StreamBridge<I, O> implements Stream<I, O> {
this.closeFn = args.closeFn;
}

onConnected(callback: () => void): void {
debugAssert(
!this.wrappedOnConnected,
'Called onConnected on stream twice!'
);
this.wrappedOnConnected = callback;
}

onOpen(callback: () => void): void {
debugAssert(!this.wrappedOnOpen, 'Called onOpen on stream twice!');
this.wrappedOnOpen = callback;
Expand All @@ -61,6 +70,14 @@ export class StreamBridge<I, O> implements Stream<I, O> {
this.sendFn(msg);
}

callOnConnected(): void {
debugAssert(
this.wrappedOnConnected !== undefined,
'Cannot call onConnected because no callback was set'
);
this.wrappedOnConnected();
}

callOnOpen(): void {
debugAssert(
this.wrappedOnOpen !== undefined,
Expand Down
Expand Up @@ -63,6 +63,10 @@ describeFn('WebChannel', () => {
}
};

// Register an "onConnected" callback since it's required, even though we
// don't care about this event.
stream.onConnected(() => {});

// Once the stream is open, send an "add_target" request
stream.onOpen(() => {
stream.send(payload);
Expand Down
38 changes: 35 additions & 3 deletions packages/firestore/test/integration/remote/stream.test.ts
Expand Up @@ -22,7 +22,10 @@ import {
Token
} from '../../../src/api/credentials';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
import { Target } from '../../../src/core/target';
import { TargetData, TargetPurpose } from '../../../src/local/target_data';
import { MutationResult } from '../../../src/model/mutation';
import { ResourcePath } from '../../../src/model/path';
import {
newPersistentWatchStream,
newPersistentWriteStream
Expand Down Expand Up @@ -57,7 +60,8 @@ type StreamEventType =
| 'mutationResult'
| 'watchChange'
| 'open'
| 'close';
| 'close'
| 'connected';

const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })];

Expand Down Expand Up @@ -117,6 +121,10 @@ class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
return this.resolvePending('watchChange');
}

onConnected(): Promise<void> {
return this.resolvePending('connected');
}

onOpen(): Promise<void> {
return this.resolvePending('open');
}
Expand Down Expand Up @@ -148,6 +156,14 @@ describe('Watch Stream', () => {
});
});
});

it('gets connected event before first message', () => {
return withTestWatchStream(async (watchStream, streamListener) => {
await streamListener.awaitCallback('open');
watchStream.watch(sampleTargetData());
await streamListener.awaitCallback('connected');
});
});
});

class MockAuthCredentialsProvider extends EmptyAuthCredentialsProvider {
Expand Down Expand Up @@ -190,6 +206,7 @@ describe('Write Stream', () => {
'Handshake must be complete before writing mutations'
);
writeStream.writeHandshake();
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');

// Now writes should succeed
Expand All @@ -205,9 +222,10 @@ describe('Write Stream', () => {
return withTestWriteStream((writeStream, streamListener, queue) => {
return streamListener
.awaitCallback('open')
.then(() => {
.then(async () => {
writeStream.writeHandshake();
return streamListener.awaitCallback('handshakeComplete');
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');
})
.then(() => {
writeStream.markIdle();
Expand All @@ -228,6 +246,7 @@ describe('Write Stream', () => {
return withTestWriteStream(async (writeStream, streamListener, queue) => {
await streamListener.awaitCallback('open');
writeStream.writeHandshake();
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');

// Mark the stream idle, but immediately cancel the idle timer by issuing another write.
Expand Down Expand Up @@ -336,3 +355,16 @@ export async function withTestWatchStream(
streamListener.verifyNoPendingCallbacks();
});
}

function sampleTargetData(): TargetData {
const target: Target = {
path: ResourcePath.emptyPath(),
collectionGroup: null,
orderBy: [],
filters: [],
limit: null,
startAt: null,
endAt: null
};
return new TargetData(target, 1, TargetPurpose.Listen, 1);
}

0 comments on commit 2244194

Please sign in to comment.