Skip to content

Commit

Permalink
fix(emit): protect emit calls fixes #2213
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 16, 2021
1 parent b6d530f commit 9cf5345
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 29 deletions.
70 changes: 41 additions & 29 deletions lib/queue.js
Expand Up @@ -395,19 +395,19 @@ Queue.prototype._setupQueueEventListeners = function() {
const token = keyAndToken[1];
switch (key) {
case activeKey:
this.emit('global:active', message, 'waiting');
utils.emitSafe(this, 'global:active', message, 'waiting');
break;
case waitingKey:
if (this.token === token) {
this.emit('waiting', message, null);
utils.emitSafe(this, 'waiting', message, null);
}
token && this.emit('global:waiting', message, null);
token && utils.emitSafe(this, 'global:waiting', message, null);
break;
case stalledKey:
if (this.token === token) {
this.emit('stalled', message);
utils.emitSafe(this, 'stalled', message);
}
this.emit('global:stalled', message);
utils.emitSafe(this, 'global:stalled', message);
break;
}
};
Expand All @@ -419,13 +419,13 @@ Queue.prototype._setupQueueEventListeners = function() {
// New way to send progress message data
try {
const { progress, jobId } = JSON.parse(message);
this.emit('global:progress', jobId, progress);
utils.emitSafe(this, 'global:progress', jobId, progress);
} catch (err) {
// If we fail we should try to parse the data using the deprecated method
const commaPos = message.indexOf(',');
const jobId = message.substring(0, commaPos);
const progress = message.substring(commaPos + 1);
this.emit('global:progress', jobId, JSON.parse(progress));
utils.emitSafe(this, 'global:progress', jobId, JSON.parse(progress));
}
break;
}
Expand All @@ -443,20 +443,26 @@ Queue.prototype._setupQueueEventListeners = function() {
}
case pausedKey:
case resumedKey:
this.emit('global:' + message);
utils.emitSafe(this, 'global:' + message);
break;
case completedKey: {
const data = JSON.parse(message);
this.emit('global:completed', data.jobId, data.val, 'active');
utils.emitSafe(
this,
'global:completed',
data.jobId,
data.val,
'active'
);
break;
}
case failedKey: {
const data = JSON.parse(message);
this.emit('global:failed', data.jobId, data.val, 'active');
utils.emitSafe(this, 'global:failed', data.jobId, data.val, 'active');
break;
}
case drainedKey:
this.emit('global:drained');
utils.emitSafe(this, 'global:drained');
break;
}
};
Expand Down Expand Up @@ -500,7 +506,7 @@ Queue.prototype._registerEvent = function(eventName) {
}
})
.then(() => {
this.emit('registered:' + eventName);
utils.emitSafe(this, 'registered:' + eventName);
});
} else {
return this.registeredEvents[_eventName];
Expand Down Expand Up @@ -585,7 +591,7 @@ Queue.prototype.close = function(doNotWaitJobs) {
)
.finally(() => {
this.closed = true;
this.emit('close');
utils.emitSafe(this, 'close');
}));
};

Expand Down Expand Up @@ -633,7 +639,7 @@ Queue.prototype.process = function(name, concurrency, handler) {

Queue.prototype.start = function(concurrency, name) {
return this.run(concurrency, name).catch(err => {
this.emit('error', err, 'error running queue');
utils.emitSafe(this, 'error', err, 'error running queue');
throw err;
});
};
Expand Down Expand Up @@ -833,7 +839,7 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) {
}
})
.then(() => {
return this.emit('paused');
return utils.emitSafe(this, 'paused');
});
};

Expand All @@ -849,7 +855,7 @@ Queue.prototype.resume = function(isLocal /* Optional */) {
}
})
.then(() => {
this.emit('resumed');
utils.emitSafe(this, 'resumed');
});
};

Expand Down Expand Up @@ -939,7 +945,7 @@ Queue.prototype.updateDelayTimer = function() {
return null;
})
.catch(err => {
this.emit('error', err, 'Error updating the delay timer');
utils.emitSafe(this, 'error', err, 'Error updating the delay timer');
if (this.delayTimer) {
clearTimeout(this.delayTimer);
}
Expand All @@ -966,7 +972,8 @@ Queue.prototype.moveUnlockedJobsToWait = function() {
.then(([failed, stalled]) => {
const handleFailedJobs = failed.map(jobId => {
return this.getJobFromId(jobId).then(job => {
this.emit(
utils.emitSafe(
this,
'failed',
job,
new Error('job stalled more than allowable limit'),
Expand All @@ -979,15 +986,20 @@ Queue.prototype.moveUnlockedJobsToWait = function() {
return this.getJobFromId(jobId).then(job => {
// Do not emit the event if the job was completed by another worker
if (job !== null) {
this.emit('stalled', job);
utils.emitSafe(this, 'stalled', job);
}
return null;
});
});
return Promise.all(handleFailedJobs.concat(handleStalledJobs));
})
.catch(err => {
this.emit('error', err, 'Failed to handle unlocked job in active');
utils.emitSafe(
this,
'error',
err,
'Failed to handle unlocked job in active'
);
});
};

Expand Down Expand Up @@ -1026,7 +1038,7 @@ Queue.prototype._processJobOnNextTick = function(
.then(this.processJob)
.then(processJobs, err => {
if (!(this.closing && err.message === 'Connection is closed.')) {
this.emit('error', err, 'Error processing job');
utils.emitSafe(this, 'error', err, 'Error processing job');

//
// Wait before trying to process again.
Expand All @@ -1040,7 +1052,7 @@ Queue.prototype._processJobOnNextTick = function(
}));
})
.catch(err => {
this.emit('error', err, 'Error processing job');
utils.emitSafe(this, 'error', err, 'Error processing job');
});
} else {
resolve(this.closing);
Expand Down Expand Up @@ -1076,7 +1088,7 @@ Queue.prototype.processJob = function(job, notFetch = false) {
}
})
.catch(err => {
this.emit('lock-extension-failed', job, err);
utils.emitSafe(this, 'lock-extension-failed', job, err);
});
}
);
Expand All @@ -1091,7 +1103,7 @@ Queue.prototype.processJob = function(job, notFetch = false) {

const handleCompleted = result => {
return job.moveToCompleted(result, undefined, notFetch).then(jobData => {
this.emit('completed', job, result, 'active');
utils.emitSafe(this, 'completed', job, result, 'active');
return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null;
});
};
Expand All @@ -1100,7 +1112,7 @@ Queue.prototype.processJob = function(job, notFetch = false) {
const error = err;

return job.moveToFailed(err).then(jobData => {
this.emit('failed', job, error, 'active');
utils.emitSafe(this, 'failed', job, error, 'active');
return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null;
});
};
Expand All @@ -1120,7 +1132,7 @@ Queue.prototype.processJob = function(job, notFetch = false) {
}

// Local event with jobPromise so that we can cancel job.
this.emit('active', job, jobPromise, 'waiting');
utils.emitSafe(this, 'active', job, jobPromise, 'waiting');

return jobPromise
.then(handleCompleted)
Expand Down Expand Up @@ -1191,7 +1203,7 @@ Queue.prototype.nextJobFromJobData = function(jobData, jobId) {
return job;
} else {
this.drained = true;
this.emit('drained');
utils.emitSafe(this, 'drained');
return null;
}
};
Expand Down Expand Up @@ -1235,11 +1247,11 @@ Queue.prototype.clean = function(grace, type, limit) {
return scripts
.cleanJobsInSet(this, type, Date.now() - grace, limit)
.then(jobs => {
this.emit('cleaned', jobs, type);
utils.emitSafe(this, 'cleaned', jobs, type);
return jobs;
})
.catch(err => {
this.emit('error', err);
utils.emitSafe(this, 'error', err);
throw err;
});
});
Expand Down
12 changes: 12 additions & 0 deletions lib/utils.js
Expand Up @@ -45,3 +45,15 @@ function isRedisReady(client) {
module.exports.errorObject = errorObject;
module.exports.tryCatch = tryCatch;
module.exports.isRedisReady = isRedisReady;
module.exports.emitSafe = function(emitter, event, ...args) {
try {
return emitter.emit(event, ...args);
} catch (err) {
try {
return emitter.emit('error', err);
} catch (err) {
// We give up if the error event also throws an exception.
console.error(err);
}
}
};

0 comments on commit 9cf5345

Please sign in to comment.