Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GQL_CONNECTION_KEEP_ALIVE on subscription server #691

Merged
merged 1 commit into from Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/options.md
Expand Up @@ -59,6 +59,7 @@
- `subscription.context`: `Function` Result of function is passed to subscription resolvers as a custom GraphQL context. The function receives the `connection` and `request` as parameters.
- `subscription.onConnect`: `Function` A function which can be used to validate the `connection_init` payload. If defined it should return a truthy value to authorize the connection. If it returns an object the subscription context will be extended with the returned object.
- `subscription.onDisconnect`: `Function` A function which is called with the subscription context of the connection after the connection gets disconnected.
- `subscription.keepAlive`: `Integer` Optional interval in ms to send the `GQL_CONNECTION_KEEP_ALIVE` message.
- `federationMetadata`: Boolean. Enable federation metadata support so the service can be deployed behind an Apollo Gateway
- `gateway`: Object. Run the GraphQL server in gateway mode.

Expand Down
1 change: 1 addition & 0 deletions index.d.ts
Expand Up @@ -516,6 +516,7 @@ export interface MercuriusCommonOptions {
payload: any;
}) => Record<string, any> | Promise<Record<string, any>>;
onDisconnect?: (context: MercuriusContext) => void | Promise<void>;
keepAlive?: number,
};
/**
* Enable federation metadata support so the service can be deployed behind an Apollo Gateway
Expand Down
5 changes: 4 additions & 1 deletion index.js
Expand Up @@ -122,6 +122,7 @@ const plugin = fp(async function (app, opts) {
let subscriptionContextFn
let onConnect
let onDisconnect
let keepAlive

if (typeof subscriptionOpts === 'object') {
if (subscriptionOpts.pubsub) {
Expand All @@ -134,6 +135,7 @@ const plugin = fp(async function (app, opts) {
subscriptionContextFn = subscriptionOpts.context
onConnect = subscriptionOpts.onConnect
onDisconnect = subscriptionOpts.onDisconnect
keepAlive = subscriptionOpts.keepAlive
} else if (subscriptionOpts === true) {
emitter = mq()
subscriber = new PubSub(emitter)
Expand Down Expand Up @@ -246,7 +248,8 @@ const plugin = fp(async function (app, opts) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
}

Expand Down
6 changes: 4 additions & 2 deletions lib/routes.js
Expand Up @@ -182,7 +182,8 @@ module.exports = async function (app, opts) {
lruGatewayResolvers,
entityResolversFactory,
persistedQueryProvider,
allowBatchedQueries
allowBatchedQueries,
keepAlive
} = opts

// Load the persisted query settings
Expand Down Expand Up @@ -306,7 +307,8 @@ module.exports = async function (app, opts) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
} else {
app.route(getOptions)
Expand Down
20 changes: 19 additions & 1 deletion lib/subscription-connection.js
Expand Up @@ -19,7 +19,8 @@ module.exports = class SubscriptionConnection {
context = {},
onConnect,
onDisconnect,
resolveContext
resolveContext,
keepAlive
}) {
this.fastify = fastify
this.socket = socket
Expand All @@ -33,6 +34,7 @@ module.exports = class SubscriptionConnection {
this.context = context
this.isReady = false
this.resolveContext = resolveContext
this.keepAlive = keepAlive
this.headers = {}

this.protocolMessageTypes = getProtocolByName(socket.protocol)
Expand Down Expand Up @@ -130,6 +132,16 @@ module.exports = class SubscriptionConnection {
}

this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ACK)

if (this.keepAlive) {
this.sendKeepAlive()

/* istanbul ignore next */
this.keepAliveTimer = setInterval(() => {
this.sendKeepAlive()
}, this.keepAlive)
}

this.isReady = true
}

Expand Down Expand Up @@ -258,6 +270,8 @@ module.exports = class SubscriptionConnection {
.map((subIter) => subIter.return && subIter.return())
this.socket.close()

if (this.keepAliveTimer) clearInterval(this.keepAliveTimer)

if (typeof this.onDisconnect === 'function') {
Promise.resolve()
.then(() => this.onDisconnect(this.context))
Expand Down Expand Up @@ -300,4 +314,8 @@ module.exports = class SubscriptionConnection {
close () {
this.handleConnectionClose()
}

sendKeepAlive () {
this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_KEEP_ALIVE)
}
}
10 changes: 6 additions & 4 deletions lib/subscription.js
Expand Up @@ -6,7 +6,7 @@ const { kHooks } = require('./symbols')
const SubscriptionConnection = require('./subscription-connection')
const { getProtocolByName } = require('./subscription-protocol')

function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn }) {
function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive }) {
return async (connection, request) => {
const { socket } = connection

Expand Down Expand Up @@ -44,7 +44,8 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
lruGatewayResolvers,
entityResolversFactory,
context,
resolveContext
resolveContext,
keepAlive
})

/* istanbul ignore next */
Expand All @@ -58,7 +59,7 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
}

module.exports = function (fastify, opts, next) {
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn } = opts
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive } = opts

// If `fastify.websocketServer` exists, it means `fastify-websocket` already registered.
// Without this check, fastify-websocket will be registered multiple times and raises FST_ERR_DEC_ALREADY_PRESENT.
Expand All @@ -80,7 +81,8 @@ module.exports = function (fastify, opts, next) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
})

Expand Down
49 changes: 49 additions & 0 deletions test/subscription.js
Expand Up @@ -52,6 +52,55 @@ test('subscription server replies with connection_ack', t => {
})
})

test('subscription server replies with keep alive when enabled', t => {
const app = Fastify()
t.teardown(() => app.close())

const schema = `
type Query {
add(x: Int, y: Int): Int
}
`

const resolvers = {
Query: {
add: (parent, { x, y }) => x + y
}
}

app.register(GQL, {
schema,
resolvers,
subscription: {
keepAlive: 10000
}
})

app.listen(0, err => {
t.error(err)

const url = 'ws://localhost:' + (app.server.address()).port + '/graphql'
const ws = new WebSocket(url, 'graphql-ws')
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
t.teardown(client.destroy.bind(client))

client.setEncoding('utf8')
client.write(JSON.stringify({
type: 'connection_init'
}))
client.on('data', chunk => {
const payload = JSON.parse(chunk)

// keep alive only comes after the ack
if (payload.type === 'connection_ack') return

t.equal(payload.type, 'ka')
client.end()
t.end()
})
})
})

test('subscription server sends update to subscriptions', t => {
const app = Fastify()
t.teardown(() => app.close())
Expand Down