From 9cf5345611e930d0881cc0d6b3664ed0fc22f456 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 16 Nov 2021 16:12:01 +0800 Subject: [PATCH] fix(emit): protect emit calls fixes #2213 --- lib/queue.js | 70 ++++++++++++++++++++++++++++++---------------------- lib/utils.js | 12 +++++++++ 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 38746ccbd..569c579f0 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -395,19 +395,19 @@ Queue.prototype._setupQueueEventListeners = function() { const token = keyAndToken[1]; switch (key) { case activeKey: - this.emit('global:active', message, 'waiting'); + utils.emitSafe(this, 'global:active', message, 'waiting'); break; case waitingKey: if (this.token === token) { - this.emit('waiting', message, null); + utils.emitSafe(this, 'waiting', message, null); } - token && this.emit('global:waiting', message, null); + token && utils.emitSafe(this, 'global:waiting', message, null); break; case stalledKey: if (this.token === token) { - this.emit('stalled', message); + utils.emitSafe(this, 'stalled', message); } - this.emit('global:stalled', message); + utils.emitSafe(this, 'global:stalled', message); break; } }; @@ -419,13 +419,13 @@ Queue.prototype._setupQueueEventListeners = function() { // New way to send progress message data try { const { progress, jobId } = JSON.parse(message); - this.emit('global:progress', jobId, progress); + utils.emitSafe(this, 'global:progress', jobId, progress); } catch (err) { // If we fail we should try to parse the data using the deprecated method const commaPos = message.indexOf(','); const jobId = message.substring(0, commaPos); const progress = message.substring(commaPos + 1); - this.emit('global:progress', jobId, JSON.parse(progress)); + utils.emitSafe(this, 'global:progress', jobId, JSON.parse(progress)); } break; } @@ -443,20 +443,26 @@ Queue.prototype._setupQueueEventListeners = function() { } case pausedKey: case resumedKey: - this.emit('global:' + message); + utils.emitSafe(this, 'global:' + message); break; case completedKey: { const data = JSON.parse(message); - this.emit('global:completed', data.jobId, data.val, 'active'); + utils.emitSafe( + this, + 'global:completed', + data.jobId, + data.val, + 'active' + ); break; } case failedKey: { const data = JSON.parse(message); - this.emit('global:failed', data.jobId, data.val, 'active'); + utils.emitSafe(this, 'global:failed', data.jobId, data.val, 'active'); break; } case drainedKey: - this.emit('global:drained'); + utils.emitSafe(this, 'global:drained'); break; } }; @@ -500,7 +506,7 @@ Queue.prototype._registerEvent = function(eventName) { } }) .then(() => { - this.emit('registered:' + eventName); + utils.emitSafe(this, 'registered:' + eventName); }); } else { return this.registeredEvents[_eventName]; @@ -585,7 +591,7 @@ Queue.prototype.close = function(doNotWaitJobs) { ) .finally(() => { this.closed = true; - this.emit('close'); + utils.emitSafe(this, 'close'); })); }; @@ -633,7 +639,7 @@ Queue.prototype.process = function(name, concurrency, handler) { Queue.prototype.start = function(concurrency, name) { return this.run(concurrency, name).catch(err => { - this.emit('error', err, 'error running queue'); + utils.emitSafe(this, 'error', err, 'error running queue'); throw err; }); }; @@ -833,7 +839,7 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) { } }) .then(() => { - return this.emit('paused'); + return utils.emitSafe(this, 'paused'); }); }; @@ -849,7 +855,7 @@ Queue.prototype.resume = function(isLocal /* Optional */) { } }) .then(() => { - this.emit('resumed'); + utils.emitSafe(this, 'resumed'); }); }; @@ -939,7 +945,7 @@ Queue.prototype.updateDelayTimer = function() { return null; }) .catch(err => { - this.emit('error', err, 'Error updating the delay timer'); + utils.emitSafe(this, 'error', err, 'Error updating the delay timer'); if (this.delayTimer) { clearTimeout(this.delayTimer); } @@ -966,7 +972,8 @@ Queue.prototype.moveUnlockedJobsToWait = function() { .then(([failed, stalled]) => { const handleFailedJobs = failed.map(jobId => { return this.getJobFromId(jobId).then(job => { - this.emit( + utils.emitSafe( + this, 'failed', job, new Error('job stalled more than allowable limit'), @@ -979,7 +986,7 @@ Queue.prototype.moveUnlockedJobsToWait = function() { return this.getJobFromId(jobId).then(job => { // Do not emit the event if the job was completed by another worker if (job !== null) { - this.emit('stalled', job); + utils.emitSafe(this, 'stalled', job); } return null; }); @@ -987,7 +994,12 @@ Queue.prototype.moveUnlockedJobsToWait = function() { return Promise.all(handleFailedJobs.concat(handleStalledJobs)); }) .catch(err => { - this.emit('error', err, 'Failed to handle unlocked job in active'); + utils.emitSafe( + this, + 'error', + err, + 'Failed to handle unlocked job in active' + ); }); }; @@ -1026,7 +1038,7 @@ Queue.prototype._processJobOnNextTick = function( .then(this.processJob) .then(processJobs, err => { if (!(this.closing && err.message === 'Connection is closed.')) { - this.emit('error', err, 'Error processing job'); + utils.emitSafe(this, 'error', err, 'Error processing job'); // // Wait before trying to process again. @@ -1040,7 +1052,7 @@ Queue.prototype._processJobOnNextTick = function( })); }) .catch(err => { - this.emit('error', err, 'Error processing job'); + utils.emitSafe(this, 'error', err, 'Error processing job'); }); } else { resolve(this.closing); @@ -1076,7 +1088,7 @@ Queue.prototype.processJob = function(job, notFetch = false) { } }) .catch(err => { - this.emit('lock-extension-failed', job, err); + utils.emitSafe(this, 'lock-extension-failed', job, err); }); } ); @@ -1091,7 +1103,7 @@ Queue.prototype.processJob = function(job, notFetch = false) { const handleCompleted = result => { return job.moveToCompleted(result, undefined, notFetch).then(jobData => { - this.emit('completed', job, result, 'active'); + utils.emitSafe(this, 'completed', job, result, 'active'); return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }); }; @@ -1100,7 +1112,7 @@ Queue.prototype.processJob = function(job, notFetch = false) { const error = err; return job.moveToFailed(err).then(jobData => { - this.emit('failed', job, error, 'active'); + utils.emitSafe(this, 'failed', job, error, 'active'); return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }); }; @@ -1120,7 +1132,7 @@ Queue.prototype.processJob = function(job, notFetch = false) { } // Local event with jobPromise so that we can cancel job. - this.emit('active', job, jobPromise, 'waiting'); + utils.emitSafe(this, 'active', job, jobPromise, 'waiting'); return jobPromise .then(handleCompleted) @@ -1191,7 +1203,7 @@ Queue.prototype.nextJobFromJobData = function(jobData, jobId) { return job; } else { this.drained = true; - this.emit('drained'); + utils.emitSafe(this, 'drained'); return null; } }; @@ -1235,11 +1247,11 @@ Queue.prototype.clean = function(grace, type, limit) { return scripts .cleanJobsInSet(this, type, Date.now() - grace, limit) .then(jobs => { - this.emit('cleaned', jobs, type); + utils.emitSafe(this, 'cleaned', jobs, type); return jobs; }) .catch(err => { - this.emit('error', err); + utils.emitSafe(this, 'error', err); throw err; }); }); diff --git a/lib/utils.js b/lib/utils.js index 022f3ff53..d659269aa 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -45,3 +45,15 @@ function isRedisReady(client) { module.exports.errorObject = errorObject; module.exports.tryCatch = tryCatch; module.exports.isRedisReady = isRedisReady; +module.exports.emitSafe = function(emitter, event, ...args) { + try { + return emitter.emit(event, ...args); + } catch (err) { + try { + return emitter.emit('error', err); + } catch (err) { + // We give up if the error event also throws an exception. + console.error(err); + } + } +};