Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation threaded processors #689

Merged
merged 4 commits into from Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 31 additions & 2 deletions README.md
Expand Up @@ -79,13 +79,13 @@ Are you developing bull sponsored by a company? Please, let us now!
- [x] Concurrency.
- [x] Pause/resume—globally or locally.
- [x] Multiple job types per queue.
- [x] Threaded (sandboxed) processing functions.
- [x] Automatic recovery from process crashes.

And coming up on the roadmap...

- [ ] Job completion acknowledgement.
- [ ] Parent-child jobs relationships.
- [ ] Threaded processing functions.

---

Expand Down Expand Up @@ -119,6 +119,7 @@ better suits your needs.
| Global events | ✓ | ✓ | | |
| Rate Limiter | ✓ | | | |
| Pause/Resume | ✓ | ✓ | | |
| Sandboxed worker| ✓ | | | |
| Repeatable jobs | ✓ | | | ✓ |
| Atomic ops | ✓ | | ✓ | |
| Persistence | ✓ | ✓ | ✓ | ✓ |
Expand Down Expand Up @@ -152,7 +153,6 @@ Definitions are currently maintained in the [DefinitelyTyped](https://github.com

---


### Quick Guide

```js
Expand Down Expand Up @@ -248,6 +248,35 @@ videoQueue.process(function(job){ // don't forget to remove the done callback!
});
```

The process function can also be run in a separate process. This has several advantages:
- The process is sandboxed so if it crashes it does not affect the worker.
- You can run blocking code without affecting the queue (jobs will not stall).
- Much better utilization of multi-core CPUs.
- Less connections to redis.

In order to use this feature just create a separate file with the processor:
```js
// processor.js
module.exports = function(job){
// Do some heavy work

return Promise.resolve(result);
}
```

And define the processor like this:

```js
// Single process:
queue.process('/path/to/my/processor.js');

// You can use concurrency as well:
queue.process(5, '/path/to/my/processor.js');

// and named processors:
queue.process('my processor', 5, '/path/to/my/processor.js');
```

A job can be added to a queue and processed repeatedly according to a cron specification:

```
Expand Down
53 changes: 53 additions & 0 deletions lib/process/child-pool.js
@@ -0,0 +1,53 @@

var fork = require('child_process').fork;
var path = require('path');
var pool = {};
var Promise = require('bluebird');

module.exports.retain = function(processFile){
return new Promise(function(resolve, rejected) {
var keys = Object.keys(pool);
for(var i=0; i<keys.length; i++){
var child = pool[keys[i]];
if(!child.retained){
child.retained = true;
return resolve(child.subprocess);
}
}

try{
var child = fork(path.join(__dirname, './master.js'));

child.on('exit', function(code, signal){
console.error('Child process exited', child.pid, code, signal);
delete pool[child.pid];
});

pool[child.pid] = {
subprocess: child,
retained: true
};

child.send({
cmd: 'init',
value: processFile
}, function() {
resolve(child);
});
}catch(err){
reject(err);
}
});
};

module.exports.release = function(child){;
pool[child.pid].retained = false;
};

module.exports.clean = function(){
var keys = Object.keys(pool);
for(var i=0; i<keys.length; i++){
pool[keys[i]].subprocess.kill();
delete pool[keys[i]];
}
};
70 changes: 70 additions & 0 deletions lib/process/master.js
@@ -0,0 +1,70 @@
/**
* Master of child processes. Handles communication between the
* processor and the main process.
*
*/
var status;
var processor;
var Promise = require('bluebird');

// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify
if (!('toJSON' in Error.prototype)){
Object.defineProperty(Error.prototype, 'toJSON', {
value: function () {
var alt = {};

Object.getOwnPropertyNames(this).forEach(function (key) {
alt[key] = this[key];
}, this);

return alt;
},
configurable: true,
writable: true
});
}

process.on('message', function(msg) {

switch(msg.cmd){
case 'init':
processor = require(msg.value);
status = 'IDLE';
break;

case 'start':
if(status !== 'IDLE'){
return process.send({
cmd: 'error',
err: new Error('cannot start a not idling child process')
});
}
status = 'STARTED';
Promise.resolve(processor(wrapJob(msg.job)) || {}).then( function(result) {
process.send({
cmd: 'completed',
value: result
});
}, function(err) {
process.send({
cmd: 'failed',
value: err
});
}).finally(function(){
status = 'IDLE';
});
break;
case 'stop':
break;
}
});

function wrapJob(job){
job.progress = function(progress){
process.send({
cmd: 'progress',
value: progress
});
};
return job;
}
39 changes: 39 additions & 0 deletions lib/process/sandbox.js
@@ -0,0 +1,39 @@
var Promise = require('bluebird');
var childPool = require('./child-pool');

module.exports = function(processFile){
return function process(job){
return childPool.retain(processFile).then(function(child){

child.send({
cmd: 'start',
job: job
});

var done = new Promise(function(resolve, reject) {
function handler(msg){
switch(msg.cmd){
case 'completed':
child.removeListener('message', handler);
resolve(msg.value);
break;
case 'failed':
case 'error':
child.removeListener('message', handler);
reject(msg.value);
break;
case 'progress':
job.progress(msg.value);
break;
}
}

child.on('message', handler);
});

return done.finally( function(){
childPool.release(child);
});
});
};
};
27 changes: 19 additions & 8 deletions lib/queue.js
Expand Up @@ -7,7 +7,6 @@ var EventEmitter = require('events');
var _ = require('lodash');

var util = require('util');
var utils = require('./utils');
var url = require('url');
var Job = require('./job');
var scripts = require('./scripts');
Expand Down Expand Up @@ -162,7 +161,7 @@ var Queue = function Queue(name, url, opts){
if (semver.lt(version, MINIMUM_REDIS_VERSION)){
_this.emit('error', new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version));
}
}).catch(function(err){
}).catch(function(/*err*/){
// Ignore this error.
});
}
Expand Down Expand Up @@ -478,6 +477,7 @@ Queue.prototype.close = function( doNotWaitJobs ){
}).then(function(){
return _this.disconnect();
}).finally(function(){
require('./process/child-pool').clean();
_this.closed = true;
});
};
Expand Down Expand Up @@ -509,6 +509,13 @@ Queue.prototype._clearTimers = function(){
@method process
*/
Queue.prototype.process = function(name, concurrency, handler){

if(arguments.length === 1){
handler = name;
concurrency = 1;
name = Job.DEFAULT_JOB_NAME;
}

if(typeof name !== 'string'){
handler = concurrency;
concurrency = name;
Expand Down Expand Up @@ -546,12 +553,17 @@ Queue.prototype.setHandler = function(name, handler){

this.setWorkerName();

handler = handler.bind(this);
if(typeof handler === 'string'){
var sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler).bind(this);
} else {
handler = handler.bind(this);

if(handler.length > 1){
this.handlers[name] = Promise.promisify(handler);
}else{
this.handlers[name] = Promise.method(handler);
if(handler.length > 1){
this.handlers[name] = Promise.promisify(handler);
}else{
this.handlers[name] = Promise.method(handler);
}
}
};

Expand Down Expand Up @@ -890,7 +902,6 @@ Queue.prototype.getNextJob = function() {
if(this.closing){
return Promise.resolve();
}

if(this.drained){
//
// Waiting for new jobs to arrive
Expand Down
2 changes: 1 addition & 1 deletion lib/scripts.js
Expand Up @@ -27,7 +27,7 @@ var scripts = {
});
},

addJob: function(client, queue, job, opts, token){
addJob: function(client, queue, job, opts){

var queueKeys = queue.keys;
var keys = [
Expand Down
12 changes: 12 additions & 0 deletions test/fixtures/fixture_processor.js
@@ -0,0 +1,12 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(500).then(function(){
return 42;
});
};
12 changes: 12 additions & 0 deletions test/fixtures/fixture_processor_fail.js
@@ -0,0 +1,12 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(500).then(function(){
throw new Error('Manually failed processor');
});
};
23 changes: 23 additions & 0 deletions test/fixtures/fixture_processor_progress.js
@@ -0,0 +1,23 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(50).then(function(){
job.progress(10);
return Promise.delay(100);
}).then(function(){
job.progress(27);
return Promise.delay(150);
}).then(function(){
job.progress(78);
return Promise.delay(100);
}).then(function(){
return job.progress(100);
}).then(function(){
return 37;
});
};
13 changes: 6 additions & 7 deletions test/test_rate_limiter.js
Expand Up @@ -27,20 +27,19 @@ describe('Rate limiter', function () {

it('should obey the rate limit', function(done) {
var startTime = new Date().getTime();
var nbProcessed = 0;
var numJobs = 4;

queue.process(function() {
return Promise.resolve();
});

queue.add({});
queue.add({});
queue.add({});
queue.add({});
for(var i=0; i<numJobs; i++){
queue.add({});
}

queue.on('completed', _.after(4, function() {
queue.on('completed', _.after(numJobs, function() {
try {
expect(new Date().getTime() - startTime).to.be.above(3000);
expect(new Date().getTime() - startTime).to.be.above((numJobs - 1) * 1000);
done();
} catch (e) {
done(e);
Expand Down