Skip to content

Commit

Permalink
Eagerly start subscription heartbeats (#7871)
Browse files Browse the repository at this point in the history
# Overview

Eagerly start subscription heartbeats in case the subscription has
long-running setup steps before returning its async generator. When a
subscription is configured, `subscribe` can return either an
`AsyncGenerator` or a `Promise<AsyncGenerator>`. If an `AsyncGenerator`
is returned, the previous code works fine. We get the generator right
away and set up the heartbeat to keep things alive, even if the
generator has long-running setup code before returning values.

On the other hand, if the `Promise<AsyncGenerator>` takes longer than
the hearbeat interval to resolve, the router will terminate the
subscription. When the subscription actually starts generating data, the
callback returns 404, as it has already been cleaned up. We want the
system to gracefully wait for the setup code to complete, since we will
do so if the setup code is placed inside the iterator itself.

## Example

In this example, `subscribe` returns a `Promise<AsyncGenerator>`, but
the `Promise` resolves quickly. This results in the heartbeat being
started prior to the long-running setup. The heartbeat keeps the
subscription alive until the setup is completed, then it can start
sending data to the client.

```
return {
    subscribe: async () => {
        return {
            async *[Symbol.asyncIterator]() {
                // The heartbeat is already started, so this can wait as long as it wants
                await setupWithLongDelay();
                while (running) {
                    yield data;
                }
            },
        };
    },
}
```

On the other hand, waiting for the setup before resolving the `Promise`
leads to complications.

```
return {
    subscribe: async () => {
        // The router kills this request because it doesn't start the heartbeat in time
        await setupWithLongDelay(); 
        return {
            async *[Symbol.asyncIterator]() {
                while (running) {
                    yield data;
                }
            },
        };
    },
}
```

## Solution

This change eagerly starts the heartbeat interval prior to awaiting the
returned value from `subscribe`. This allows long-running setup code to
run concurrently with the heartbeats. Previously the second example
would result in a `SUBSCRIPTION_HEARTBEAT_ERROR` and subscription
termination. Now, the router receives heartbeats so it correctly waits
for the setup and data in both cases.
  • Loading branch information
tninesling committed Apr 18, 2024
1 parent 7e64902 commit 18a3827
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/angry-carrots-allow.md
@@ -0,0 +1,5 @@
---
"@apollo/server": patch
---

Subscription heartbeats are initialized prior to awaiting subscribe(). This allows long-running setup to happen in the returned Promise without the subscription being terminated prior to resolution.
Expand Up @@ -165,9 +165,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -274,9 +274,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Heartbeat disabled for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Heartbeat disabled for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"TESTING: Triggering first update",
Expand Down Expand Up @@ -387,9 +387,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -505,9 +505,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -653,17 +653,17 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionCallback[5678-dogs]: Received new subscription request",
"SubscriptionManager[5678-dogs]: Sending \`check\` request to router",
"SubscriptionManager[5678-dogs]: \`check\` request successful",
"SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url-2.com",
"SubscriptionCallback[5678-dogs]: Starting graphql-js subscription",
"SubscriptionCallback[5678-dogs]: graphql-js subscription successful",
"SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url-2.com",
"SubscriptionManager[5678-dogs]: Listening to graphql-js subscription",
"SubscriptionCallback[5678-dogs]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -850,17 +850,17 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionCallback[5678-dogs]: Received new subscription request",
"SubscriptionManager[5678-dogs]: Sending \`check\` request to router",
"SubscriptionManager[5678-dogs]: \`check\` request successful",
"SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url.com/5678-dogs",
"SubscriptionCallback[5678-dogs]: Starting graphql-js subscription",
"SubscriptionCallback[5678-dogs]: graphql-js subscription successful",
"SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url.com/5678-dogs",
"SubscriptionManager[5678-dogs]: Listening to graphql-js subscription",
"SubscriptionCallback[5678-dogs]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1003,9 +1003,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1084,9 +1084,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager[1234-cats]: Sending \`next\` request to router",
Expand Down Expand Up @@ -1139,9 +1139,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!",
Expand Down Expand Up @@ -1274,9 +1274,11 @@ describe('SubscriptionCallbackPlugin', () => {
],
});

// Trigger the heartbeat interval just to make sure it doesn't actually
// happen in this case (we haven't mocked it, so it'll throw an error if it
// sends a heartbeat).
// The heartbeat is initialized just before awaiting the subscription.
// So in this case where the subscription throws, there will be one
// heartbeat before the subscription is cleaned up.
mockRouterCheckResponse();

jest.advanceTimersByTime(5000);

await completeRequest;
Expand All @@ -1285,13 +1287,17 @@ describe('SubscriptionCallbackPlugin', () => {
[
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"ERROR: SubscriptionCallback[1234-cats]: graphql-js subscription unsuccessful: [
The subscription field "invalidSubscriptionField" is not defined.
]",
"SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
"SubscriptionManager[1234-cats]: \`complete\` request successful",
"SubscriptionManager: Heartbeat received response for ID: 1234-cats",
"SubscriptionManager: Heartbeat request successful, ID: 1234-cats",
"SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals",
"SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.",
]
Expand Down Expand Up @@ -1353,9 +1359,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1433,9 +1439,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1543,9 +1549,9 @@ describe('SubscriptionCallbackPlugin', () => {
"WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 1) due to error: request to http://mock-router-url.com/ failed, reason: network request error",
"WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 2) due to error: request to http://mock-router-url.com/ failed, reason: network request error",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1664,9 +1670,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1772,9 +1778,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1875,9 +1881,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down Expand Up @@ -1981,9 +1987,9 @@ describe('SubscriptionCallbackPlugin', () => {
"SubscriptionCallback[1234-cats]: Received new subscription request",
"SubscriptionManager[1234-cats]: Sending \`check\` request to router",
"SubscriptionManager[1234-cats]: \`check\` request successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionCallback[1234-cats]: Starting graphql-js subscription",
"SubscriptionCallback[1234-cats]: graphql-js subscription successful",
"SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com",
"SubscriptionManager[1234-cats]: Listening to graphql-js subscription",
"SubscriptionCallback[1234-cats]: Responding to original subscription request",
"SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats",
Expand Down

0 comments on commit 18a3827

Please sign in to comment.