Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): enabled queues to share childPool instance #2237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions REFERENCE.md
Expand Up @@ -107,6 +107,7 @@ interface AdvancedSettings {
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
backoffStrategies: {}; // A set of custom backoff strategies keyed by name.
drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs).
isSharedChildPool: boolean = false; // enables multiple queues on the same instance of child pool to share the same instance.
}
```

Expand Down Expand Up @@ -592,8 +593,8 @@ for the job when it was added.
removeRepeatableByKey(key: string): Promise<void>
```

Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.
Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.

There are currently two ways to get the "key" of a repeatable job.

Expand Down Expand Up @@ -691,7 +692,7 @@ Returns a promise that will return the waiting job counts for the given queue.

### Queue#getPausedCount

*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
result.

```ts
Expand Down Expand Up @@ -970,7 +971,7 @@ A queue emits also some useful events:
})

.on('lock-extension-failed', function (job, err) {
// A job failed to extend lock. This will be useful to debug redis
// A job failed to extend lock. This will be useful to debug redis
// connection issues and jobs getting restarted because workers
// are not able to extend locks.
});
Expand Down
15 changes: 14 additions & 1 deletion lib/process/child-pool.js
Expand Up @@ -121,4 +121,17 @@ async function initChild(child, processFile) {
);
await onComplete;
}
module.exports = ChildPool;
function ChildPoolSingleton(isSharedChildPool = false) {
if(isSharedChildPool === false) {
return new ChildPool();
}
else if (
(!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined)
) {
ChildPoolSingleton.instance = new ChildPool();
}

return ChildPoolSingleton.instance;
}

module.exports = ChildPoolSingleton;
9 changes: 6 additions & 3 deletions lib/queue.js
Expand Up @@ -71,6 +71,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18';
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
drainDelay?: number = 5
isSharedChildPool?: boolean = false
}
}

Expand Down Expand Up @@ -215,7 +216,8 @@ const Queue = function Queue(name, url, opts) {
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {}
backoffStrategies: {},
isSharedChildPool: false,
});

this.settings.lockRenewTime =
Expand Down Expand Up @@ -662,8 +664,9 @@ Queue.prototype.setHandler = function(name, handler) {
if (!fs.existsSync(processorFile)) {
throw new Error('File ' + processorFile + ' does not exist');
}

this.childPool = this.childPool || require('./process/child-pool')();
const isSharedChildPool = this.settings.isSharedChildPool;
this.childPool =
this.childPool || require('./process/child-pool')(isSharedChildPool);

const sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
Expand Down
18 changes: 18 additions & 0 deletions test/test_child-pool.js
Expand Up @@ -141,4 +141,22 @@ describe('Child pool', () => {
expect(children).to.include(child);
});
});

it('should returned a shared child pool is isSharedChildPool is true', () => {
expect(childPool(true)).to.be.equal(new childPool(true));
});

it('should return a different childPool if isSharedChildPool is false', () => {
expect(childPool()).to.not.be.equal(childPool());
});

it('should not overwrite the the childPool singleton when isSharedChildPool is false', () => {
const childPoolA = new childPool(true)
const childPoolB = new childPool(false)
const childPoolC = new childPool(true);

expect(childPoolA).to.be.equal(childPoolC)
expect(childPoolB).to.not.be.equal(childPoolA)
expect(childPoolB).to.not.be.equal(childPoolC)
})
});
3 changes: 2 additions & 1 deletion test/test_queue.js
Expand Up @@ -405,7 +405,7 @@ describe('Queue', () => {
});
});

describe(' a worker', () => {
describe('a worker', () => {
let queue;

beforeEach(() => {
Expand Down Expand Up @@ -2121,6 +2121,7 @@ describe('Queue', () => {
queue.on('failed', cb);
queue.on('error', done);
});

});

describe('Retries and backoffs', () => {
Expand Down
31 changes: 31 additions & 0 deletions test/test_sandboxed_process.js
Expand Up @@ -454,4 +454,35 @@ describe('sandboxed process', () => {
const jobResult = await job.finished();
expect(jobResult).to.equal(42);
});

it('should share child pool across all different queues created', async () => {
const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: true } }),
utils.newQueue('queueB', { settings: { isSharedChildPool: true } })
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)

await Promise.all([queueA.add(), queueB.add()]);


expect(queueA.childPool).to.be.eql(queueB.childPool);
});

it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => {
const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: false } }),
utils.newQueue('queueB')
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)

await Promise.all([queueA.add(), queueB.add()]);

expect(queueA.childPool).to.not.be.equal(queueB.childPool);
})
});