From 18a3827d63c3916f6aaccbc4bdef3e0d550d91a7 Mon Sep 17 00:00:00 2001 From: Taylor Ninesling Date: Thu, 18 Apr 2024 09:47:48 -0500 Subject: [PATCH] Eagerly start subscription heartbeats (#7871) # Overview Eagerly start subscription heartbeats in case the subscription has long-running setup steps before returning its async generator. When a subscription is configured, `subscribe` can return either an `AsyncGenerator` or a `Promise`. If an `AsyncGenerator` is returned, the previous code works fine. We get the generator right away and set up the heartbeat to keep things alive, even if the generator has long-running setup code before returning values. On the other hand, if the `Promise` takes longer than the hearbeat interval to resolve, the router will terminate the subscription. When the subscription actually starts generating data, the callback returns 404, as it has already been cleaned up. We want the system to gracefully wait for the setup code to complete, since we will do so if the setup code is placed inside the iterator itself. ## Example In this example, `subscribe` returns a `Promise`, but the `Promise` resolves quickly. This results in the heartbeat being started prior to the long-running setup. The heartbeat keeps the subscription alive until the setup is completed, then it can start sending data to the client. ``` return { subscribe: async () => { return { async *[Symbol.asyncIterator]() { // The heartbeat is already started, so this can wait as long as it wants await setupWithLongDelay(); while (running) { yield data; } }, }; }, } ``` On the other hand, waiting for the setup before resolving the `Promise` leads to complications. ``` return { subscribe: async () => { // The router kills this request because it doesn't start the heartbeat in time await setupWithLongDelay(); return { async *[Symbol.asyncIterator]() { while (running) { yield data; } }, }; }, } ``` ## Solution This change eagerly starts the heartbeat interval prior to awaiting the returned value from `subscribe`. This allows long-running setup code to run concurrently with the heartbeats. Previously the second example would result in a `SUBSCRIPTION_HEARTBEAT_ERROR` and subscription termination. Now, the router receives heartbeats so it correctly waits for the setup and data in both cases. --- .changeset/angry-carrots-allow.md | 5 ++ .../plugin/subscriptionCallback/index.test.ts | 48 +++++++++++-------- .../src/plugin/subscriptionCallback/index.ts | 22 +++++---- 3 files changed, 45 insertions(+), 30 deletions(-) create mode 100644 .changeset/angry-carrots-allow.md diff --git a/.changeset/angry-carrots-allow.md b/.changeset/angry-carrots-allow.md new file mode 100644 index 00000000000..dddaf64d617 --- /dev/null +++ b/.changeset/angry-carrots-allow.md @@ -0,0 +1,5 @@ +--- +"@apollo/server": patch +--- + +Subscription heartbeats are initialized prior to awaiting subscribe(). This allows long-running setup to happen in the returned Promise without the subscription being terminated prior to resolution. diff --git a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts index ea12d27eeff..18233b9b9ac 100644 --- a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts +++ b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts @@ -165,9 +165,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -274,9 +274,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Heartbeat disabled for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Heartbeat disabled for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "TESTING: Triggering first update", @@ -387,9 +387,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -505,9 +505,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -653,17 +653,17 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionCallback[5678-dogs]: Received new subscription request", "SubscriptionManager[5678-dogs]: Sending \`check\` request to router", "SubscriptionManager[5678-dogs]: \`check\` request successful", + "SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url-2.com", "SubscriptionCallback[5678-dogs]: Starting graphql-js subscription", "SubscriptionCallback[5678-dogs]: graphql-js subscription successful", - "SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url-2.com", "SubscriptionManager[5678-dogs]: Listening to graphql-js subscription", "SubscriptionCallback[5678-dogs]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -850,17 +850,17 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionCallback[5678-dogs]: Received new subscription request", "SubscriptionManager[5678-dogs]: Sending \`check\` request to router", "SubscriptionManager[5678-dogs]: \`check\` request successful", + "SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url.com/5678-dogs", "SubscriptionCallback[5678-dogs]: Starting graphql-js subscription", "SubscriptionCallback[5678-dogs]: graphql-js subscription successful", - "SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url.com/5678-dogs", "SubscriptionManager[5678-dogs]: Listening to graphql-js subscription", "SubscriptionCallback[5678-dogs]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1003,9 +1003,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1084,9 +1084,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager[1234-cats]: Sending \`next\` request to router", @@ -1139,9 +1139,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!", @@ -1274,9 +1274,11 @@ describe('SubscriptionCallbackPlugin', () => { ], }); - // Trigger the heartbeat interval just to make sure it doesn't actually - // happen in this case (we haven't mocked it, so it'll throw an error if it - // sends a heartbeat). + // The heartbeat is initialized just before awaiting the subscription. + // So in this case where the subscription throws, there will be one + // heartbeat before the subscription is cleaned up. + mockRouterCheckResponse(); + jest.advanceTimersByTime(5000); await completeRequest; @@ -1285,13 +1287,17 @@ describe('SubscriptionCallbackPlugin', () => { [ "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "ERROR: SubscriptionCallback[1234-cats]: graphql-js subscription unsuccessful: [ The subscription field "invalidSubscriptionField" is not defined. ]", "SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors", "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Heartbeat received response for ID: 1234-cats", + "SubscriptionManager: Heartbeat request successful, ID: 1234-cats", "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] @@ -1353,9 +1359,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1433,9 +1439,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1543,9 +1549,9 @@ describe('SubscriptionCallbackPlugin', () => { "WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 1) due to error: request to http://mock-router-url.com/ failed, reason: network request error", "WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 2) due to error: request to http://mock-router-url.com/ failed, reason: network request error", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1664,9 +1670,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1772,9 +1778,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1875,9 +1881,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", @@ -1981,9 +1987,9 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", "SubscriptionCallback[1234-cats]: graphql-js subscription successful", - "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", "SubscriptionCallback[1234-cats]: Responding to original subscription request", "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", diff --git a/packages/server/src/plugin/subscriptionCallback/index.ts b/packages/server/src/plugin/subscriptionCallback/index.ts index 159f9cb7c75..b3d9d7152c1 100644 --- a/packages/server/src/plugin/subscriptionCallback/index.ts +++ b/packages/server/src/plugin/subscriptionCallback/index.ts @@ -98,6 +98,17 @@ export function ApolloServerPluginSubscriptionCallback( return; } + // We're about to `await` the subscription, which could have some + // nontrivial asynchronous work to do before returning the generator. + // We set up the heartbeat first so the router knows we're alive + // during that wait period. + subscriptionManager.initHeartbeat({ + callbackUrl, + id, + verifier, + heartbeatIntervalMs, + }); + // The `check` request was successful, so we can initialize the actual // `graphql-js` subscription. We don't expect `subscribe` to throw, // but rather return an object with `errors` on it (if there are any). @@ -152,16 +163,9 @@ export function ApolloServerPluginSubscriptionCallback( logger?.error(`\`complete\` request failed: ${e}`, id); } } else if (isAsyncIterable(subscription)) { - // We have a real subscription - now we can kick off the heartbeat - // interval and consume the AsyncIterable on the `subscription` - // object. + // We have a real subscription - now we can consume the + // AsyncIterable on the `subscription` object. logger?.debug('graphql-js subscription successful', id); - subscriptionManager.initHeartbeat({ - callbackUrl, - id, - verifier, - heartbeatIntervalMs, - }); subscriptionManager.startConsumingSubscription({ subscription,