Skip to content

Commit

Permalink
Implement GQL_CONNECTION_KEEP_ALIVE on subscription server
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanosala committed Dec 22, 2021
1 parent c0c0dd3 commit 5f653d2
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/api/options.md
Expand Up @@ -59,6 +59,7 @@
- `subscription.context`: `Function` Result of function is passed to subscription resolvers as a custom GraphQL context. The function receives the `connection` and `request` as parameters.
- `subscription.onConnect`: `Function` A function which can be used to validate the `connection_init` payload. If defined it should return a truthy value to authorize the connection. If it returns an object the subscription context will be extended with the returned object.
- `subscription.onDisconnect`: `Function` A function which is called with the subscription context of the connection after the connection gets disconnected.
- `subscription.keepAlive`: `Integer` Optional interval in ms to send the `GQL_CONNECTION_KEEP_ALIVE` message.
- `federationMetadata`: Boolean. Enable federation metadata support so the service can be deployed behind an Apollo Gateway
- `gateway`: Object. Run the GraphQL server in gateway mode.

Expand Down
1 change: 1 addition & 0 deletions index.d.ts
Expand Up @@ -516,6 +516,7 @@ export interface MercuriusCommonOptions {
payload: any;
}) => Record<string, any> | Promise<Record<string, any>>;
onDisconnect?: (context: MercuriusContext) => void | Promise<void>;
keepAlive?: number,
};
/**
* Enable federation metadata support so the service can be deployed behind an Apollo Gateway
Expand Down
5 changes: 4 additions & 1 deletion index.js
Expand Up @@ -122,6 +122,7 @@ const plugin = fp(async function (app, opts) {
let subscriptionContextFn
let onConnect
let onDisconnect
let keepAlive

if (typeof subscriptionOpts === 'object') {
if (subscriptionOpts.pubsub) {
Expand All @@ -134,6 +135,7 @@ const plugin = fp(async function (app, opts) {
subscriptionContextFn = subscriptionOpts.context
onConnect = subscriptionOpts.onConnect
onDisconnect = subscriptionOpts.onDisconnect
keepAlive = subscriptionOpts.keepAlive
} else if (subscriptionOpts === true) {
emitter = mq()
subscriber = new PubSub(emitter)
Expand Down Expand Up @@ -246,7 +248,8 @@ const plugin = fp(async function (app, opts) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
}

Expand Down
6 changes: 4 additions & 2 deletions lib/routes.js
Expand Up @@ -182,7 +182,8 @@ module.exports = async function (app, opts) {
lruGatewayResolvers,
entityResolversFactory,
persistedQueryProvider,
allowBatchedQueries
allowBatchedQueries,
keepAlive
} = opts

// Load the persisted query settings
Expand Down Expand Up @@ -306,7 +307,8 @@ module.exports = async function (app, opts) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
} else {
app.route(getOptions)
Expand Down
20 changes: 19 additions & 1 deletion lib/subscription-connection.js
Expand Up @@ -19,7 +19,8 @@ module.exports = class SubscriptionConnection {
context = {},
onConnect,
onDisconnect,
resolveContext
resolveContext,
keepAlive
}) {
this.fastify = fastify
this.socket = socket
Expand All @@ -33,6 +34,7 @@ module.exports = class SubscriptionConnection {
this.context = context
this.isReady = false
this.resolveContext = resolveContext
this.keepAlive = keepAlive
this.headers = {}

this.protocolMessageTypes = getProtocolByName(socket.protocol)
Expand Down Expand Up @@ -130,6 +132,16 @@ module.exports = class SubscriptionConnection {
}

this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ACK)

if (this.keepAlive) {
this.sendKeepAlive()

/* istanbul ignore next */
this.keepAliveTimer = setInterval(() => {
this.sendKeepAlive()
}, this.keepAlive)
}

this.isReady = true
}

Expand Down Expand Up @@ -258,6 +270,8 @@ module.exports = class SubscriptionConnection {
.map((subIter) => subIter.return && subIter.return())
this.socket.close()

if (this.keepAliveTimer) clearInterval(this.keepAliveTimer)

if (typeof this.onDisconnect === 'function') {
Promise.resolve()
.then(() => this.onDisconnect(this.context))
Expand Down Expand Up @@ -300,4 +314,8 @@ module.exports = class SubscriptionConnection {
close () {
this.handleConnectionClose()
}

sendKeepAlive () {
this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_KEEP_ALIVE)
}
}
10 changes: 6 additions & 4 deletions lib/subscription.js
Expand Up @@ -6,7 +6,7 @@ const { kHooks } = require('./symbols')
const SubscriptionConnection = require('./subscription-connection')
const { getProtocolByName } = require('./subscription-protocol')

function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn }) {
function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive }) {
return async (connection, request) => {
const { socket } = connection

Expand Down Expand Up @@ -44,7 +44,8 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
lruGatewayResolvers,
entityResolversFactory,
context,
resolveContext
resolveContext,
keepAlive
})

/* istanbul ignore next */
Expand All @@ -58,7 +59,7 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
}

module.exports = function (fastify, opts, next) {
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn } = opts
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn, keepAlive } = opts

// If `fastify.websocketServer` exists, it means `fastify-websocket` already registered.
// Without this check, fastify-websocket will be registered multiple times and raises FST_ERR_DEC_ALREADY_PRESENT.
Expand All @@ -80,7 +81,8 @@ module.exports = function (fastify, opts, next) {
onDisconnect,
lruGatewayResolvers,
entityResolversFactory,
subscriptionContextFn
subscriptionContextFn,
keepAlive
})
})

Expand Down
49 changes: 49 additions & 0 deletions test/subscription.js
Expand Up @@ -52,6 +52,55 @@ test('subscription server replies with connection_ack', t => {
})
})

test('subscription server replies with keep alive when enabled', t => {
const app = Fastify()
t.teardown(() => app.close())

const schema = `
type Query {
add(x: Int, y: Int): Int
}
`

const resolvers = {
Query: {
add: (parent, { x, y }) => x + y
}
}

app.register(GQL, {
schema,
resolvers,
subscription: {
keepAlive: 10000
}
})

app.listen(0, err => {
t.error(err)

const url = 'ws://localhost:' + (app.server.address()).port + '/graphql'
const ws = new WebSocket(url, 'graphql-ws')
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
t.teardown(client.destroy.bind(client))

client.setEncoding('utf8')
client.write(JSON.stringify({
type: 'connection_init'
}))
client.on('data', chunk => {
const payload = JSON.parse(chunk)

// keep alive only comes after the ack
if (payload.type === 'connection_ack') return

t.equal(payload.type, 'ka')
client.end()
t.end()
})
})
})

test('subscription server sends update to subscriptions', t => {
const app = Fastify()
t.teardown(() => app.close())
Expand Down

0 comments on commit 5f653d2

Please sign in to comment.