Skip to content

Commit

Permalink
feat(queue): enabled queues to share childPool instance (#2237)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfontaine-lifion committed Dec 21, 2021
1 parent efbf7dc commit 16fdbe9
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 9 deletions.
9 changes: 5 additions & 4 deletions REFERENCE.md
Expand Up @@ -107,6 +107,7 @@ interface AdvancedSettings {
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
backoffStrategies: {}; // A set of custom backoff strategies keyed by name.
drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs).
isSharedChildPool: boolean = false; // enables multiple queues on the same instance of child pool to share the same instance.
}
```

Expand Down Expand Up @@ -592,8 +593,8 @@ for the job when it was added.
removeRepeatableByKey(key: string): Promise<void>
```

Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.
Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.

There are currently two ways to get the "key" of a repeatable job.

Expand Down Expand Up @@ -691,7 +692,7 @@ Returns a promise that will return the waiting job counts for the given queue.

### Queue#getPausedCount

*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
result.

```ts
Expand Down Expand Up @@ -970,7 +971,7 @@ A queue emits also some useful events:
})

.on('lock-extension-failed', function (job, err) {
// A job failed to extend lock. This will be useful to debug redis
// A job failed to extend lock. This will be useful to debug redis
// connection issues and jobs getting restarted because workers
// are not able to extend locks.
});
Expand Down
15 changes: 14 additions & 1 deletion lib/process/child-pool.js
Expand Up @@ -121,4 +121,17 @@ async function initChild(child, processFile) {
);
await onComplete;
}
module.exports = ChildPool;
function ChildPoolSingleton(isSharedChildPool = false) {
if(isSharedChildPool === false) {
return new ChildPool();
}
else if (
(!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined)
) {
ChildPoolSingleton.instance = new ChildPool();
}

return ChildPoolSingleton.instance;
}

module.exports = ChildPoolSingleton;
9 changes: 6 additions & 3 deletions lib/queue.js
Expand Up @@ -71,6 +71,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18';
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
drainDelay?: number = 5
isSharedChildPool?: boolean = false
}
}
Expand Down Expand Up @@ -215,7 +216,8 @@ const Queue = function Queue(name, url, opts) {
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {}
backoffStrategies: {},
isSharedChildPool: false,
});

this.settings.lockRenewTime =
Expand Down Expand Up @@ -662,8 +664,9 @@ Queue.prototype.setHandler = function(name, handler) {
if (!fs.existsSync(processorFile)) {
throw new Error('File ' + processorFile + ' does not exist');
}

this.childPool = this.childPool || require('./process/child-pool')();
const isSharedChildPool = this.settings.isSharedChildPool;
this.childPool =
this.childPool || require('./process/child-pool')(isSharedChildPool);

const sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
Expand Down
18 changes: 18 additions & 0 deletions test/test_child-pool.js
Expand Up @@ -141,4 +141,22 @@ describe('Child pool', () => {
expect(children).to.include(child);
});
});

it('should returned a shared child pool is isSharedChildPool is true', () => {
expect(childPool(true)).to.be.equal(new childPool(true));
});

it('should return a different childPool if isSharedChildPool is false', () => {
expect(childPool()).to.not.be.equal(childPool());
});

it('should not overwrite the the childPool singleton when isSharedChildPool is false', () => {
const childPoolA = new childPool(true)
const childPoolB = new childPool(false)
const childPoolC = new childPool(true);

expect(childPoolA).to.be.equal(childPoolC)
expect(childPoolB).to.not.be.equal(childPoolA)
expect(childPoolB).to.not.be.equal(childPoolC)
})
});
3 changes: 2 additions & 1 deletion test/test_queue.js
Expand Up @@ -405,7 +405,7 @@ describe('Queue', () => {
});
});

describe(' a worker', () => {
describe('a worker', () => {
let queue;

beforeEach(() => {
Expand Down Expand Up @@ -2121,6 +2121,7 @@ describe('Queue', () => {
queue.on('failed', cb);
queue.on('error', done);
});

});

describe('Retries and backoffs', () => {
Expand Down
31 changes: 31 additions & 0 deletions test/test_sandboxed_process.js
Expand Up @@ -454,4 +454,35 @@ describe('sandboxed process', () => {
const jobResult = await job.finished();
expect(jobResult).to.equal(42);
});

it('should share child pool across all different queues created', async () => {
const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: true } }),
utils.newQueue('queueB', { settings: { isSharedChildPool: true } })
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)

await Promise.all([queueA.add(), queueB.add()]);


expect(queueA.childPool).to.be.eql(queueB.childPool);
});

it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => {
const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: false } }),
utils.newQueue('queueB')
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)

await Promise.all([queueA.add(), queueB.add()]);

expect(queueA.childPool).to.not.be.equal(queueB.childPool);
})
});

0 comments on commit 16fdbe9

Please sign in to comment.