Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): enabled queues to share childPool instance #2237

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions REFERENCE.md
Expand Up @@ -107,6 +107,7 @@ interface AdvancedSettings {
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
backoffStrategies: {}; // A set of custom backoff strategies keyed by name.
drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs).
isSharedChildPool: boolean = false; // enables multiple queues on the same instance of child pool to share the same instance.
}
```

Expand Down Expand Up @@ -592,8 +593,8 @@ for the job when it was added.
removeRepeatableByKey(key: string): Promise<void>
```

Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.
Removes a given Repeatable Job configuration by its key so that no more repeatable jobs will be processed for this
particular configuration.

There are currently two ways to get the "key" of a repeatable job.

Expand Down Expand Up @@ -691,7 +692,7 @@ Returns a promise that will return the waiting job counts for the given queue.

### Queue#getPausedCount

*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
*DEPRECATED* Since only the queue can be paused, getWaitingCount gives the same
result.

```ts
Expand Down Expand Up @@ -970,7 +971,7 @@ A queue emits also some useful events:
})

.on('lock-extension-failed', function (job, err) {
// A job failed to extend lock. This will be useful to debug redis
// A job failed to extend lock. This will be useful to debug redis
// connection issues and jobs getting restarted because workers
// are not able to extend locks.
});
Expand Down
13 changes: 12 additions & 1 deletion lib/process/child-pool.js
Expand Up @@ -121,4 +121,15 @@ async function initChild(child, processFile) {
);
await onComplete;
}
module.exports = ChildPool;
function ChildPoolSingleton(isSharedChildPool = false) {
if (
isSharedChildPool === false ||
(!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined)
) {
ChildPoolSingleton.instance = new ChildPool();
}

return ChildPoolSingleton.instance;
}

module.exports = ChildPoolSingleton;
9 changes: 6 additions & 3 deletions lib/queue.js
Expand Up @@ -71,6 +71,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18';
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
drainDelay?: number = 5
isSharedChildPool?: boolean = false
}
}

Expand Down Expand Up @@ -215,7 +216,8 @@ const Queue = function Queue(name, url, opts) {
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {}
backoffStrategies: {},
isSharedChildPool: false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use trailing commas

});

this.settings.lockRenewTime =
Expand Down Expand Up @@ -662,8 +664,9 @@ Queue.prototype.setHandler = function(name, handler) {
if (!fs.existsSync(processorFile)) {
throw new Error('File ' + processorFile + ' does not exist');
}

this.childPool = this.childPool || require('./process/child-pool')();
const isSharedChildPool = this.settings.isSharedChildPool;
this.childPool =
this.childPool || require('./process/child-pool')(isSharedChildPool);

const sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
Expand Down
8 changes: 2 additions & 6 deletions lib/scripts.js
Expand Up @@ -326,17 +326,13 @@ const scripts = {
return allRemoved;
},

extendLock(
queue,
jobId,
duration,
) {
extendLock(queue, jobId, duration) {
return queue.client.extendLock([
queue.toKey(jobId) + ':lock',
queue.keys.stalled,
queue.token,
duration,
jobId,
jobId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not following the current format standard, we use trailing commas (as the default in "prettier"): https://stackoverflow.com/questions/61370583/trailing-comma-after-last-line-in-object

]);
},

Expand Down
8 changes: 8 additions & 0 deletions test/test_child-pool.js
Expand Up @@ -141,4 +141,12 @@ describe('Child pool', () => {
expect(children).to.include(child);
});
});

it('should returned a shared child pool is isSharedChildPool is true', () => {
expect(childPool(true)).to.be.equal(new childPool(true));
});

it('should return a different childPool if isSharedChildPool is false', () => {
expect(childPool()).to.not.be.equal(childPool());
});
});
22 changes: 18 additions & 4 deletions test/test_queue.js
Expand Up @@ -405,7 +405,7 @@ describe('Queue', () => {
});
});

describe(' a worker', () => {
describe('a worker', () => {
let queue;

beforeEach(() => {
Expand Down Expand Up @@ -2121,6 +2121,17 @@ describe('Queue', () => {
queue.on('failed', cb);
queue.on('error', done);
});

it('should share child pool across all different queues created', async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to also test the opposite behavior, i.e. that the pools are different if we are not specifying the isSharedChildPool.

const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: true } }),
utils.newQueue('queueB', { settings: { isSharedChildPool: true } })
]);

await Promise.all([queueA.add(), queueB.add()]);

expect(queueA.childPool).to.be.eql(queueB.childPool);
});
});

describe('Retries and backoffs', () => {
Expand Down Expand Up @@ -2837,11 +2848,14 @@ describe('Queue', () => {
});
});

it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => {
it("should clean the number of jobs requested even if first jobs timestamp doesn't match", async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use single quotes instead of double.

// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });
// This job should get cleaned since 10000 > 5000 grace
const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 });
const jobToClean = await queue.add(
{ some: 'data' },
{ timestamp: Date.now() - 10000 }
);
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });

Expand All @@ -2853,7 +2867,7 @@ describe('Queue', () => {
expect(len).to.be.eql(2);
});

it('shouldn\'t clean anything if all jobs are in grace period', async () => {
it("shouldn't clean anything if all jobs are in grace period", async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use single quotes

await queue.add({ some: 'data' });
await queue.add({ some: 'data' });

Expand Down