From 6dfe83089136619f5faf7131d416082dd1ac54f1 Mon Sep 17 00:00:00 2001 From: "Fontaine, Jason" Date: Thu, 16 Dec 2021 14:46:28 -0500 Subject: [PATCH 1/3] fix(queue): enabled queues to share childPool instance --- REFERENCE.md | 9 +++++---- lib/process/child-pool.js | 11 ++++++++++- lib/queue.js | 8 +++++--- test/test_child-pool.js | 8 ++++++++ test/test_queue.js | 14 +++++++++++++- 5 files changed, 41 insertions(+), 9 deletions(-) diff --git a/REFERENCE.md b/REFERENCE.md index 7f82dddc..ed215eee 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -107,6 +107,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). + isSharedChildPool: boolean = false; // enables multiple queues on the same instance of child pool to share the same instance. } ``` @@ -592,8 +593,8 @@ for the job when it was added. removeRepeatableByKey(key: string): Promise ``` -Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this -particular configuration. +Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this +particular configuration. There are currently two ways to get the "key" of a repeatable job. @@ -691,7 +692,7 @@ Returns a promise that will return the waiting job counts for the given queue. ### Queue#getPausedCount -*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same +*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same result. ```ts @@ -970,7 +971,7 @@ A queue emits also some useful events: }) .on('lock-extension-failed', function (job, err) { - // A job failed to extend lock. This will be useful to debug redis + // A job failed to extend lock. This will be useful to debug redis // connection issues and jobs getting restarted because workers // are not able to extend locks. }); diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 7f4cca52..ef6a8b19 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -121,4 +121,13 @@ async function initChild(child, processFile) { ); await onComplete; } -module.exports = ChildPool; +function ChildPoolSingleton( isSharedChildPool = false ) { + if (isSharedChildPool === false || ((!(this instanceof ChildPool)) && ChildPoolSingleton.instance === undefined)) { + ChildPoolSingleton.instance = new ChildPool(); + } + + return ChildPoolSingleton.instance; +} + + +module.exports = ChildPoolSingleton diff --git a/lib/queue.js b/lib/queue.js index 6b068d54..ce34680d 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -71,6 +71,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18'; guardInterval?: number = 5000, retryProcessDelay?: number = 5000, drainDelay?: number = 5 + isSharedChildPool?: boolean = false } } @@ -215,7 +216,8 @@ const Queue = function Queue(name, url, opts) { guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, - backoffStrategies: {} + backoffStrategies: {}, + isSharedChildPool: false, }); this.settings.lockRenewTime = @@ -662,8 +664,8 @@ Queue.prototype.setHandler = function(name, handler) { if (!fs.existsSync(processorFile)) { throw new Error('File ' + processorFile + ' does not exist'); } - - this.childPool = this.childPool || require('./process/child-pool')(); + const isSharedChildPool = this.settings.isSharedChildPool + this.childPool = this.childPool || require('./process/child-pool')(isSharedChildPool) const sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 47880cf4..73abaeb5 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -141,4 +141,12 @@ describe('Child pool', () => { expect(children).to.include(child); }); }); + + it('should returned a shared child pool is isSharedChildPool is true', () => { + expect(childPool(true)).to.be.equal(new childPool(true)) + }) + + it('should return a different childPool if isSharedChildPool is false', () => { + expect(childPool()).to.not.be.equal(childPool()) + }) }); diff --git a/test/test_queue.js b/test/test_queue.js index e703d600..1b45877f 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -405,7 +405,7 @@ describe('Queue', () => { }); }); - describe(' a worker', () => { + describe('a worker', () => { let queue; beforeEach(() => { @@ -2121,6 +2121,18 @@ describe('Queue', () => { queue.on('failed', cb); queue.on('error', done); }); + + it ('should share child pool across all different queues created' ,async () => { + const [queueA, queueB ] = await Promise.all([ + utils.newQueue('queueA', { settings: { isSharedChildPool: true } }), + utils.newQueue('queueB', { settings: { isSharedChildPool: true } }) + ]) + + await Promise.all([queueA.add(), queueB.add()]) + + expect(queueA.childPool).to.be.eql(queueB.childPool); + + }) }); describe('Retries and backoffs', () => { From 2e5c3e51b3804bb6ee095bc9390d4f48e5ca94bb Mon Sep 17 00:00:00 2001 From: "Fontaine, Jason" Date: Thu, 16 Dec 2021 15:09:58 -0500 Subject: [PATCH 2/3] fix(queue): ran prettier --- lib/process/child-pool.js | 10 ++++++---- lib/queue.js | 7 ++++--- lib/scripts.js | 8 ++------ test/test_child-pool.js | 8 ++++---- test/test_queue.js | 20 +++++++++++--------- 5 files changed, 27 insertions(+), 26 deletions(-) diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index ef6a8b19..1e9da34e 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -121,13 +121,15 @@ async function initChild(child, processFile) { ); await onComplete; } -function ChildPoolSingleton( isSharedChildPool = false ) { - if (isSharedChildPool === false || ((!(this instanceof ChildPool)) && ChildPoolSingleton.instance === undefined)) { +function ChildPoolSingleton(isSharedChildPool = false) { + if ( + isSharedChildPool === false || + (!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined) + ) { ChildPoolSingleton.instance = new ChildPool(); } return ChildPoolSingleton.instance; } - -module.exports = ChildPoolSingleton +module.exports = ChildPoolSingleton; diff --git a/lib/queue.js b/lib/queue.js index ce34680d..59343cf8 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -217,7 +217,7 @@ const Queue = function Queue(name, url, opts) { retryProcessDelay: 5000, drainDelay: 5, backoffStrategies: {}, - isSharedChildPool: false, + isSharedChildPool: false }); this.settings.lockRenewTime = @@ -664,8 +664,9 @@ Queue.prototype.setHandler = function(name, handler) { if (!fs.existsSync(processorFile)) { throw new Error('File ' + processorFile + ' does not exist'); } - const isSharedChildPool = this.settings.isSharedChildPool - this.childPool = this.childPool || require('./process/child-pool')(isSharedChildPool) + const isSharedChildPool = this.settings.isSharedChildPool; + this.childPool = + this.childPool || require('./process/child-pool')(isSharedChildPool); const sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/lib/scripts.js b/lib/scripts.js index ceaac84f..cebb68c6 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -326,17 +326,13 @@ const scripts = { return allRemoved; }, - extendLock( - queue, - jobId, - duration, - ) { + extendLock(queue, jobId, duration) { return queue.client.extendLock([ queue.toKey(jobId) + ':lock', queue.keys.stalled, queue.token, duration, - jobId, + jobId ]); }, diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 73abaeb5..6e0a689b 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -143,10 +143,10 @@ describe('Child pool', () => { }); it('should returned a shared child pool is isSharedChildPool is true', () => { - expect(childPool(true)).to.be.equal(new childPool(true)) - }) + expect(childPool(true)).to.be.equal(new childPool(true)); + }); it('should return a different childPool if isSharedChildPool is false', () => { - expect(childPool()).to.not.be.equal(childPool()) - }) + expect(childPool()).to.not.be.equal(childPool()); + }); }); diff --git a/test/test_queue.js b/test/test_queue.js index 1b45877f..93e06f96 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2122,17 +2122,16 @@ describe('Queue', () => { queue.on('error', done); }); - it ('should share child pool across all different queues created' ,async () => { - const [queueA, queueB ] = await Promise.all([ + it('should share child pool across all different queues created', async () => { + const [queueA, queueB] = await Promise.all([ utils.newQueue('queueA', { settings: { isSharedChildPool: true } }), utils.newQueue('queueB', { settings: { isSharedChildPool: true } }) - ]) + ]); - await Promise.all([queueA.add(), queueB.add()]) + await Promise.all([queueA.add(), queueB.add()]); expect(queueA.childPool).to.be.eql(queueB.childPool); - - }) + }); }); describe('Retries and backoffs', () => { @@ -2849,11 +2848,14 @@ describe('Queue', () => { }); }); - it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => { + it("should clean the number of jobs requested even if first jobs timestamp doesn't match", async () => { // This job shouldn't get deleted due to the 5000 grace await queue.add({ some: 'data' }); // This job should get cleaned since 10000 > 5000 grace - const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 }); + const jobToClean = await queue.add( + { some: 'data' }, + { timestamp: Date.now() - 10000 } + ); // This job shouldn't get deleted due to the 5000 grace await queue.add({ some: 'data' }); @@ -2865,7 +2867,7 @@ describe('Queue', () => { expect(len).to.be.eql(2); }); - it('shouldn\'t clean anything if all jobs are in grace period', async () => { + it("shouldn't clean anything if all jobs are in grace period", async () => { await queue.add({ some: 'data' }); await queue.add({ some: 'data' }); From af66f725d8ab770349a502260b08506e08b74fec Mon Sep 17 00:00:00 2001 From: "Fontaine, Jason" Date: Fri, 17 Dec 2021 12:24:49 -0500 Subject: [PATCH 3/3] fix(childpool): tested negative case for queues to share child pool --- lib/process/child-pool.js | 6 ++++-- lib/queue.js | 2 +- lib/scripts.js | 8 ++++++-- test/test_child-pool.js | 10 ++++++++++ test/test_queue.js | 19 +++---------------- test/test_sandboxed_process.js | 31 +++++++++++++++++++++++++++++++ 6 files changed, 55 insertions(+), 21 deletions(-) diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 1e9da34e..018c1209 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -122,8 +122,10 @@ async function initChild(child, processFile) { await onComplete; } function ChildPoolSingleton(isSharedChildPool = false) { - if ( - isSharedChildPool === false || + if(isSharedChildPool === false) { + return new ChildPool(); + } + else if ( (!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined) ) { ChildPoolSingleton.instance = new ChildPool(); diff --git a/lib/queue.js b/lib/queue.js index 59343cf8..4793b9ec 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -217,7 +217,7 @@ const Queue = function Queue(name, url, opts) { retryProcessDelay: 5000, drainDelay: 5, backoffStrategies: {}, - isSharedChildPool: false + isSharedChildPool: false, }); this.settings.lockRenewTime = diff --git a/lib/scripts.js b/lib/scripts.js index cebb68c6..ceaac84f 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -326,13 +326,17 @@ const scripts = { return allRemoved; }, - extendLock(queue, jobId, duration) { + extendLock( + queue, + jobId, + duration, + ) { return queue.client.extendLock([ queue.toKey(jobId) + ':lock', queue.keys.stalled, queue.token, duration, - jobId + jobId, ]); }, diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 6e0a689b..b5b09ef2 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -149,4 +149,14 @@ describe('Child pool', () => { it('should return a different childPool if isSharedChildPool is false', () => { expect(childPool()).to.not.be.equal(childPool()); }); + + it('should not overwrite the the childPool singleton when isSharedChildPool is false', () => { + const childPoolA = new childPool(true) + const childPoolB = new childPool(false) + const childPoolC = new childPool(true); + + expect(childPoolA).to.be.equal(childPoolC) + expect(childPoolB).to.not.be.equal(childPoolA) + expect(childPoolB).to.not.be.equal(childPoolC) + }) }); diff --git a/test/test_queue.js b/test/test_queue.js index 93e06f96..1dc9d7f5 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -2122,16 +2122,6 @@ describe('Queue', () => { queue.on('error', done); }); - it('should share child pool across all different queues created', async () => { - const [queueA, queueB] = await Promise.all([ - utils.newQueue('queueA', { settings: { isSharedChildPool: true } }), - utils.newQueue('queueB', { settings: { isSharedChildPool: true } }) - ]); - - await Promise.all([queueA.add(), queueB.add()]); - - expect(queueA.childPool).to.be.eql(queueB.childPool); - }); }); describe('Retries and backoffs', () => { @@ -2848,14 +2838,11 @@ describe('Queue', () => { }); }); - it("should clean the number of jobs requested even if first jobs timestamp doesn't match", async () => { + it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => { // This job shouldn't get deleted due to the 5000 grace await queue.add({ some: 'data' }); // This job should get cleaned since 10000 > 5000 grace - const jobToClean = await queue.add( - { some: 'data' }, - { timestamp: Date.now() - 10000 } - ); + const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 }); // This job shouldn't get deleted due to the 5000 grace await queue.add({ some: 'data' }); @@ -2867,7 +2854,7 @@ describe('Queue', () => { expect(len).to.be.eql(2); }); - it("shouldn't clean anything if all jobs are in grace period", async () => { + it('shouldn\'t clean anything if all jobs are in grace period', async () => { await queue.add({ some: 'data' }); await queue.add({ some: 'data' }); diff --git a/test/test_sandboxed_process.js b/test/test_sandboxed_process.js index 537c0b33..4ff8fba6 100644 --- a/test/test_sandboxed_process.js +++ b/test/test_sandboxed_process.js @@ -454,4 +454,35 @@ describe('sandboxed process', () => { const jobResult = await job.finished(); expect(jobResult).to.equal(42); }); + + it('should share child pool across all different queues created', async () => { + const [queueA, queueB] = await Promise.all([ + utils.newQueue('queueA', { settings: { isSharedChildPool: true } }), + utils.newQueue('queueB', { settings: { isSharedChildPool: true } }) + ]); + + const processFile = __dirname + '/fixtures/fixture_processor.js'; + queueA.process(processFile) + queueB.process(processFile) + + await Promise.all([queueA.add(), queueB.add()]); + + + expect(queueA.childPool).to.be.eql(queueB.childPool); + }); + + it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => { + const [queueA, queueB] = await Promise.all([ + utils.newQueue('queueA', { settings: { isSharedChildPool: false } }), + utils.newQueue('queueB') + ]); + + const processFile = __dirname + '/fixtures/fixture_processor.js'; + queueA.process(processFile) + queueB.process(processFile) + + await Promise.all([queueA.add(), queueB.add()]); + + expect(queueA.childPool).to.not.be.equal(queueB.childPool); + }) });