Skip to content

Commit

Permalink
Merge pull request #689 from OptimalBits/threaded-process
Browse files Browse the repository at this point in the history
implementation threaded processors
  • Loading branch information
manast committed Sep 11, 2017
2 parents 5332eba + 43a90e2 commit beb418c
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 20 deletions.
33 changes: 31 additions & 2 deletions README.md
Expand Up @@ -79,13 +79,13 @@ Are you developing bull sponsored by a company? Please, let us now!
- [x] Concurrency.
- [x] Pause/resume—globally or locally.
- [x] Multiple job types per queue.
- [x] Threaded (sandboxed) processing functions.
- [x] Automatic recovery from process crashes.

And coming up on the roadmap...

- [ ] Job completion acknowledgement.
- [ ] Parent-child jobs relationships.
- [ ] Threaded processing functions.

---

Expand Down Expand Up @@ -119,6 +119,7 @@ better suits your needs.
| Global events ||| | |
| Rate Limiter || | | |
| Pause/Resume ||| | |
| Sandboxed worker|| | | |
| Repeatable jobs || | ||
| Atomic ops || || |
| Persistence |||||
Expand Down Expand Up @@ -153,7 +154,6 @@ Definitions are currently maintained in the [DefinitelyTyped](https://github.com

---


### Quick Guide

```js
Expand Down Expand Up @@ -249,6 +249,35 @@ videoQueue.process(function(job){ // don't forget to remove the done callback!
});
```

The process function can also be run in a separate process. This has several advantages:
- The process is sandboxed so if it crashes it does not affect the worker.
- You can run blocking code without affecting the queue (jobs will not stall).
- Much better utilization of multi-core CPUs.
- Less connections to redis.

In order to use this feature just create a separate file with the processor:
```js
// processor.js
module.exports = function(job){
// Do some heavy work

return Promise.resolve(result);
}
```

And define the processor like this:

```js
// Single process:
queue.process('/path/to/my/processor.js');

// You can use concurrency as well:
queue.process(5, '/path/to/my/processor.js');

// and named processors:
queue.process('my processor', 5, '/path/to/my/processor.js');
```

A job can be added to a queue and processed repeatedly according to a cron specification:

```
Expand Down
53 changes: 53 additions & 0 deletions lib/process/child-pool.js
@@ -0,0 +1,53 @@

var fork = require('child_process').fork;
var path = require('path');
var pool = {};
var Promise = require('bluebird');

module.exports.retain = function(processFile){
return new Promise(function(resolve, rejected) {
var keys = Object.keys(pool);
for(var i=0; i<keys.length; i++){
var child = pool[keys[i]];
if(!child.retained){
child.retained = true;
return resolve(child.subprocess);
}
}

try{
var child = fork(path.join(__dirname, './master.js'));

child.on('exit', function(code, signal){
console.error('Child process exited', child.pid, code, signal);
delete pool[child.pid];
});

pool[child.pid] = {
subprocess: child,
retained: true
};

child.send({
cmd: 'init',
value: processFile
}, function() {
resolve(child);
});
}catch(err){
reject(err);
}
});
};

module.exports.release = function(child){;
pool[child.pid].retained = false;
};

module.exports.clean = function(){
var keys = Object.keys(pool);
for(var i=0; i<keys.length; i++){
pool[keys[i]].subprocess.kill();
delete pool[keys[i]];
}
};
70 changes: 70 additions & 0 deletions lib/process/master.js
@@ -0,0 +1,70 @@
/**
* Master of child processes. Handles communication between the
* processor and the main process.
*
*/
var status;
var processor;
var Promise = require('bluebird');

// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify
if (!('toJSON' in Error.prototype)){
Object.defineProperty(Error.prototype, 'toJSON', {
value: function () {
var alt = {};

Object.getOwnPropertyNames(this).forEach(function (key) {
alt[key] = this[key];
}, this);

return alt;
},
configurable: true,
writable: true
});
}

process.on('message', function(msg) {

switch(msg.cmd){
case 'init':
processor = require(msg.value);
status = 'IDLE';
break;

case 'start':
if(status !== 'IDLE'){
return process.send({
cmd: 'error',
err: new Error('cannot start a not idling child process')
});
}
status = 'STARTED';
Promise.resolve(processor(wrapJob(msg.job)) || {}).then( function(result) {
process.send({
cmd: 'completed',
value: result
});
}, function(err) {
process.send({
cmd: 'failed',
value: err
});
}).finally(function(){
status = 'IDLE';
});
break;
case 'stop':
break;
}
});

function wrapJob(job){
job.progress = function(progress){
process.send({
cmd: 'progress',
value: progress
});
};
return job;
}
39 changes: 39 additions & 0 deletions lib/process/sandbox.js
@@ -0,0 +1,39 @@
var Promise = require('bluebird');
var childPool = require('./child-pool');

module.exports = function(processFile){
return function process(job){
return childPool.retain(processFile).then(function(child){

child.send({
cmd: 'start',
job: job
});

var done = new Promise(function(resolve, reject) {
function handler(msg){
switch(msg.cmd){
case 'completed':
child.removeListener('message', handler);
resolve(msg.value);
break;
case 'failed':
case 'error':
child.removeListener('message', handler);
reject(msg.value);
break;
case 'progress':
job.progress(msg.value);
break;
}
}

child.on('message', handler);
});

return done.finally( function(){
childPool.release(child);
});
});
};
};
27 changes: 19 additions & 8 deletions lib/queue.js
Expand Up @@ -7,7 +7,6 @@ var EventEmitter = require('events');
var _ = require('lodash');

var util = require('util');
var utils = require('./utils');
var url = require('url');
var Job = require('./job');
var scripts = require('./scripts');
Expand Down Expand Up @@ -162,7 +161,7 @@ var Queue = function Queue(name, url, opts){
if (semver.lt(version, MINIMUM_REDIS_VERSION)){
_this.emit('error', new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version));
}
}).catch(function(err){
}).catch(function(/*err*/){
// Ignore this error.
});
}
Expand Down Expand Up @@ -478,6 +477,7 @@ Queue.prototype.close = function( doNotWaitJobs ){
}).then(function(){
return _this.disconnect();
}).finally(function(){
require('./process/child-pool').clean();
_this.closed = true;
});
};
Expand Down Expand Up @@ -509,6 +509,13 @@ Queue.prototype._clearTimers = function(){
@method process
*/
Queue.prototype.process = function(name, concurrency, handler){

if(arguments.length === 1){
handler = name;
concurrency = 1;
name = Job.DEFAULT_JOB_NAME;
}

if(typeof name !== 'string'){
handler = concurrency;
concurrency = name;
Expand Down Expand Up @@ -546,12 +553,17 @@ Queue.prototype.setHandler = function(name, handler){

this.setWorkerName();

handler = handler.bind(this);
if(typeof handler === 'string'){
var sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler).bind(this);
} else {
handler = handler.bind(this);

if(handler.length > 1){
this.handlers[name] = Promise.promisify(handler);
}else{
this.handlers[name] = Promise.method(handler);
if(handler.length > 1){
this.handlers[name] = Promise.promisify(handler);
}else{
this.handlers[name] = Promise.method(handler);
}
}
};

Expand Down Expand Up @@ -890,7 +902,6 @@ Queue.prototype.getNextJob = function() {
if(this.closing){
return Promise.resolve();
}

if(this.drained){
//
// Waiting for new jobs to arrive
Expand Down
2 changes: 1 addition & 1 deletion lib/scripts.js
Expand Up @@ -27,7 +27,7 @@ var scripts = {
});
},

addJob: function(client, queue, job, opts, token){
addJob: function(client, queue, job, opts){

var queueKeys = queue.keys;
var keys = [
Expand Down
12 changes: 12 additions & 0 deletions test/fixtures/fixture_processor.js
@@ -0,0 +1,12 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(500).then(function(){
return 42;
});
};
12 changes: 12 additions & 0 deletions test/fixtures/fixture_processor_fail.js
@@ -0,0 +1,12 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(500).then(function(){
throw new Error('Manually failed processor');
});
};
23 changes: 23 additions & 0 deletions test/fixtures/fixture_processor_progress.js
@@ -0,0 +1,23 @@
/**
* A processor file to be used in tests.
*
*/

var Promise = require('bluebird');

module.exports = function(job){
return Promise.delay(50).then(function(){
job.progress(10);
return Promise.delay(100);
}).then(function(){
job.progress(27);
return Promise.delay(150);
}).then(function(){
job.progress(78);
return Promise.delay(100);
}).then(function(){
return job.progress(100);
}).then(function(){
return 37;
});
};
13 changes: 6 additions & 7 deletions test/test_rate_limiter.js
Expand Up @@ -27,20 +27,19 @@ describe('Rate limiter', function () {

it('should obey the rate limit', function(done) {
var startTime = new Date().getTime();
var nbProcessed = 0;
var numJobs = 4;

queue.process(function() {
return Promise.resolve();
});

queue.add({});
queue.add({});
queue.add({});
queue.add({});
for(var i=0; i<numJobs; i++){
queue.add({});
}

queue.on('completed', _.after(4, function() {
queue.on('completed', _.after(numJobs, function() {
try {
expect(new Date().getTime() - startTime).to.be.above(3000);
expect(new Date().getTime() - startTime).to.be.above((numJobs - 1) * 1000);
done();
} catch (e) {
done(e);
Expand Down

0 comments on commit beb418c

Please sign in to comment.