diff --git a/lib/queue.js b/lib/queue.js index ae888f216..2f1d31bf9 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -144,7 +144,9 @@ const Queue = function Queue(name, url, opts) { this.clients = []; const lazyClient = redisClientGetter(this, opts, (type, client) => { // bubble up Redis error events - client.on('error', this.emit.bind(this, 'error')); + const handler = this.emit.bind(this, 'error'); + client.on('error', handler); + this.once('close', () => client.removeListener('error', handler)); if (type === 'client') { this._initializing = commands(client).then( @@ -373,7 +375,7 @@ Queue.prototype._setupQueueEventListeners = function() { const failedKey = this.keys.failed; const drainedKey = this.keys.drained; - this.eclient.on('pmessage', (pattern, channel, message) => { + const pmessageHandler = (pattern, channel, message) => { const keyAndToken = channel.split('@'); const key = keyAndToken[0]; const token = keyAndToken[1]; @@ -394,9 +396,9 @@ Queue.prototype._setupQueueEventListeners = function() { this.emit('global:stalled', message); break; } - }); + }; - this.eclient.on('message', (channel, message) => { + const messageHandler = (channel, message) => { const key = channel.split('@')[0]; switch (key) { case progressKey: { @@ -436,6 +438,14 @@ Queue.prototype._setupQueueEventListeners = function() { this.emit('global:drained'); break; } + }; + + this.eclient.on('pmessage', pmessageHandler); + this.eclient.on('message', messageHandler); + + this.once('close', () => { + this.eclient.removeListener('pmessage', pmessageHandler); + this.eclient.removeListener('message', messageHandler); }); }; @@ -561,6 +571,7 @@ Queue.prototype.close = function(doNotWaitJobs) { .finally(() => { this.childPool && this.childPool.clean(); this.closed = true; + this.emit('close'); })); };