diff --git a/REFERENCE.md b/REFERENCE.md index 7f82dddc..ed215eee 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -107,6 +107,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). + isSharedChildPool: boolean = false; // enables multiple queues on the same instance of child pool to share the same instance. } ``` @@ -592,8 +593,8 @@ for the job when it was added. removeRepeatableByKey(key: string): Promise ``` -Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this -particular configuration. +Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this +particular configuration. There are currently two ways to get the "key" of a repeatable job. @@ -691,7 +692,7 @@ Returns a promise that will return the waiting job counts for the given queue. ### Queue#getPausedCount -*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same +*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same result. ```ts @@ -970,7 +971,7 @@ A queue emits also some useful events: }) .on('lock-extension-failed', function (job, err) { - // A job failed to extend lock. This will be useful to debug redis + // A job failed to extend lock. This will be useful to debug redis // connection issues and jobs getting restarted because workers // are not able to extend locks. }); diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 7f4cca52..018c1209 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -121,4 +121,17 @@ async function initChild(child, processFile) { ); await onComplete; } -module.exports = ChildPool; +function ChildPoolSingleton(isSharedChildPool = false) { + if(isSharedChildPool === false) { + return new ChildPool(); + } + else if ( + (!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined) + ) { + ChildPoolSingleton.instance = new ChildPool(); + } + + return ChildPoolSingleton.instance; +} + +module.exports = ChildPoolSingleton; diff --git a/lib/queue.js b/lib/queue.js index 6b068d54..4793b9ec 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -71,6 +71,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18'; guardInterval?: number = 5000, retryProcessDelay?: number = 5000, drainDelay?: number = 5 + isSharedChildPool?: boolean = false } } @@ -215,7 +216,8 @@ const Queue = function Queue(name, url, opts) { guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, - backoffStrategies: {} + backoffStrategies: {}, + isSharedChildPool: false, }); this.settings.lockRenewTime = @@ -662,8 +664,9 @@ Queue.prototype.setHandler = function(name, handler) { if (!fs.existsSync(processorFile)) { throw new Error('File ' + processorFile + ' does not exist'); } - - this.childPool = this.childPool || require('./process/child-pool')(); + const isSharedChildPool = this.settings.isSharedChildPool; + this.childPool = + this.childPool || require('./process/child-pool')(isSharedChildPool); const sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 47880cf4..b5b09ef2 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -141,4 +141,22 @@ describe('Child pool', () => { expect(children).to.include(child); }); }); + + it('should returned a shared child pool is isSharedChildPool is true', () => { + expect(childPool(true)).to.be.equal(new childPool(true)); + }); + + it('should return a different childPool if isSharedChildPool is false', () => { + expect(childPool()).to.not.be.equal(childPool()); + }); + + it('should not overwrite the the childPool singleton when isSharedChildPool is false', () => { + const childPoolA = new childPool(true) + const childPoolB = new childPool(false) + const childPoolC = new childPool(true); + + expect(childPoolA).to.be.equal(childPoolC) + expect(childPoolB).to.not.be.equal(childPoolA) + expect(childPoolB).to.not.be.equal(childPoolC) + }) }); diff --git a/test/test_queue.js b/test/test_queue.js index e703d600..1dc9d7f5 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -405,7 +405,7 @@ describe('Queue', () => { }); }); - describe(' a worker', () => { + describe('a worker', () => { let queue; beforeEach(() => { @@ -2121,6 +2121,7 @@ describe('Queue', () => { queue.on('failed', cb); queue.on('error', done); }); + }); describe('Retries and backoffs', () => { diff --git a/test/test_sandboxed_process.js b/test/test_sandboxed_process.js index 537c0b33..4ff8fba6 100644 --- a/test/test_sandboxed_process.js +++ b/test/test_sandboxed_process.js @@ -454,4 +454,35 @@ describe('sandboxed process', () => { const jobResult = await job.finished(); expect(jobResult).to.equal(42); }); + + it('should share child pool across all different queues created', async () => { + const [queueA, queueB] = await Promise.all([ + utils.newQueue('queueA', { settings: { isSharedChildPool: true } }), + utils.newQueue('queueB', { settings: { isSharedChildPool: true } }) + ]); + + const processFile = __dirname + '/fixtures/fixture_processor.js'; + queueA.process(processFile) + queueB.process(processFile) + + await Promise.all([queueA.add(), queueB.add()]); + + + expect(queueA.childPool).to.be.eql(queueB.childPool); + }); + + it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => { + const [queueA, queueB] = await Promise.all([ + utils.newQueue('queueA', { settings: { isSharedChildPool: false } }), + utils.newQueue('queueB') + ]); + + const processFile = __dirname + '/fixtures/fixture_processor.js'; + queueA.process(processFile) + queueB.process(processFile) + + await Promise.all([queueA.add(), queueB.add()]); + + expect(queueA.childPool).to.not.be.equal(queueB.childPool); + }) });