From 23d02e23f53c9b92fbad8e6ab042b9f2d85d9724 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 11 Sep 2017 15:07:03 +0200 Subject: [PATCH 1/4] initial implementation threaded processors --- README.md | 33 +++++++++++++++++++-- lib/process/child-pool.js | 40 +++++++++++++++++++++++++ lib/process/master.js | 61 +++++++++++++++++++++++++++++++++++++++ lib/process/sandbox.js | 36 +++++++++++++++++++++++ lib/queue.js | 23 +++++++++++---- 5 files changed, 185 insertions(+), 8 deletions(-) create mode 100644 lib/process/child-pool.js create mode 100644 lib/process/master.js create mode 100644 lib/process/sandbox.js diff --git a/README.md b/README.md index ed92ac63b..232ffbfb9 100644 --- a/README.md +++ b/README.md @@ -79,13 +79,13 @@ Are you developing bull sponsored by a company? Please, let us now! - [x] Concurrency. - [x] Pause/resume—globally or locally. - [x] Multiple job types per queue. +- [x] Threaded (sandboxed) processing functions. - [x] Automatic recovery from process crashes. And coming up on the roadmap... - [ ] Job completion acknowledgement. - [ ] Parent-child jobs relationships. -- [ ] Threaded processing functions. --- @@ -119,6 +119,7 @@ better suits your needs. | Global events | ✓ | ✓ | | | | Rate Limiter | ✓ | | | | | Pause/Resume | ✓ | ✓ | | | +| Sandboxed worker| ✓ | | | | | Repeatable jobs | ✓ | | | ✓ | | Atomic ops | ✓ | | ✓ | | | Persistence | ✓ | ✓ | ✓ | ✓ | @@ -152,7 +153,6 @@ Definitions are currently maintained in the [DefinitelyTyped](https://github.com --- - ### Quick Guide ```js @@ -248,6 +248,35 @@ videoQueue.process(function(job){ // don't forget to remove the done callback! }); ``` +The process function can also be run in a separate process. This has several advantages: +- The process is sandboxed so if it crashes it does not affect the worker. +- You can run blocking code without affecting the queue (jobs will not stall). +- Much better utilization of multi-core CPUs. +- Less connections to redis. + +In order to use this feature just create a separate file with the processor: +```js +// processor.js +module.exports = function(job){ + // Do some heavy work + + return Promise.resolve(result); +} +``` + +And define the processor like this: + +```js +// Single process: +queue.process('/path/to/my/processor.js'); + +// You can use concurrency as well: +queue.process(5, '/path/to/my/processor.js'); + +// and named processors: +queue.process('my processor', 5, '/path/to/my/processor.js'); +``` + A job can be added to a queue and processed repeatedly according to a cron specification: ``` diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js new file mode 100644 index 000000000..e1035e7c4 --- /dev/null +++ b/lib/process/child-pool.js @@ -0,0 +1,40 @@ + +var {fork} = require('child_process'); +var path = require('path'); +var pool = {}; +var Promise = require('bluebird'); + +module.exports.retain = function(processFile){ + return new Promise((resolve, rejected) => { + var keys = Object.keys(pool) + for(var i=0; i { + resolve(child); + }); + }catch(err){ + throw(err); + } + }); +} + +module.exports.release = function(child){; + pool[child.pid].retained = false; +} diff --git a/lib/process/master.js b/lib/process/master.js new file mode 100644 index 000000000..881482a5b --- /dev/null +++ b/lib/process/master.js @@ -0,0 +1,61 @@ +/** + * Master of child processes. Handles communication between the + * processor and the main process. + * + */ +var status; +var processor; +var Promise = require('bluebird'); + +process.on('message', (msg) => { + + switch(msg.cmd){ + case 'init': + processor = require(msg.value); + status = 'IDLE'; + break; + + case 'start': + if(status !== 'IDLE'){ + return process.send({ + cmd: 'error', + err: new Error('cannot start a not idling child process') + }); + } + status = 'STARTED'; + Promise.resolve(processor(wrapJob(msg.job)) || {}).then( (result) => { + process.send({ + cmd: 'completed', + result: result + }); + }, (err) => { + process.send({ + cmd: 'failed', + err: err + }); + }).finally(() => status = 'IDLE'); + break; + case 'stop': + break; + } +}); + +var jobHandler = { + get: function(target, name) { + if(name === 'progress'){ + return function(progress){ + process.send({ + cmd: 'progress', + value: progress + }); + } + }else{ + return target[name]; + } + } +}; + +function wrapJob(job){ + var proxy = new Proxy(job, jobHandler); + return proxy; +} diff --git a/lib/process/sandbox.js b/lib/process/sandbox.js new file mode 100644 index 000000000..57f18e9a6 --- /dev/null +++ b/lib/process/sandbox.js @@ -0,0 +1,36 @@ +var Promise = require('bluebird'); +var childPool = require('./child-pool'); + +module.exports = function(processFile){ + return function process(job){ + return childPool.retain(processFile).then(function(child){ + + child.send({ + cmd: 'start', + job: job + }); + + return (new Promise((resolve, reject) => { + function handler(msg){ + switch(msg.cmd){ + case 'completed': + child.removeListener('message', handler) + resolve(msg.value); + break; + case 'failed': + case 'error': + child.removeListener('message', handler) + reject(msg.result); + break; + case 'progress': + job.progress(msg.value); + break; + } + } + + child.on('message', handler); + })).finally( () => childPool.release(child)); + + }); + } +} diff --git a/lib/queue.js b/lib/queue.js index 1d11badb5..777965b0a 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -509,6 +509,13 @@ Queue.prototype._clearTimers = function(){ @method process */ Queue.prototype.process = function(name, concurrency, handler){ + + if(arguments.length === 1){ + handler = name; + concurrency = 1; + name = Job.DEFAULT_JOB_NAME; + } + if(typeof name !== 'string'){ handler = concurrency; concurrency = name; @@ -546,12 +553,17 @@ Queue.prototype.setHandler = function(name, handler){ this.setWorkerName(); - handler = handler.bind(this); + if(typeof handler === 'string'){ + var sandbox = require('./process/sandbox'); + this.handlers[name] = sandbox(handler).bind(this); + } else { + handler = handler.bind(this); - if(handler.length > 1){ - this.handlers[name] = Promise.promisify(handler); - }else{ - this.handlers[name] = Promise.method(handler); + if(handler.length > 1){ + this.handlers[name] = Promise.promisify(handler); + }else{ + this.handlers[name] = Promise.method(handler); + } } }; @@ -890,7 +902,6 @@ Queue.prototype.getNextJob = function() { if(this.closing){ return Promise.resolve(); } - if(this.drained){ // // Waiting for new jobs to arrive From d65493bd34e0cb0c6ccc84a216b4f162e2b63c09 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 11 Sep 2017 21:34:39 +0200 Subject: [PATCH 2/4] fixed eslint errors --- lib/process/child-pool.js | 57 +++++++++++++++++++++------------------ lib/process/master.js | 12 +++++---- lib/process/sandbox.js | 15 ++++++----- lib/queue.js | 3 +-- lib/scripts.js | 2 +- test/test_rate_limiter.js | 13 +++++---- 6 files changed, 55 insertions(+), 47 deletions(-) diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index e1035e7c4..4b9483e9c 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -1,40 +1,45 @@ -var {fork} = require('child_process'); +var fork = require('child_process').fork; var path = require('path'); var pool = {}; var Promise = require('bluebird'); module.exports.retain = function(processFile){ - return new Promise((resolve, rejected) => { - var keys = Object.keys(pool) - for(var i=0; i { - resolve(child); - }); - }catch(err){ - throw(err); - } + pool[child.pid] = { + subprocess: child, + retained: true + }; + + child.send({ + cmd: 'init', + value: processFile + }, function() { + resolve(child); + }); + }catch(err){ + reject(err); + } }); -} +}; module.exports.release = function(child){; pool[child.pid].retained = false; -} +}; diff --git a/lib/process/master.js b/lib/process/master.js index 881482a5b..a7ff13b86 100644 --- a/lib/process/master.js +++ b/lib/process/master.js @@ -7,7 +7,7 @@ var status; var processor; var Promise = require('bluebird'); -process.on('message', (msg) => { +process.on('message', function(msg) { switch(msg.cmd){ case 'init': @@ -23,17 +23,19 @@ process.on('message', (msg) => { }); } status = 'STARTED'; - Promise.resolve(processor(wrapJob(msg.job)) || {}).then( (result) => { + Promise.resolve(processor(wrapJob(msg.job)) || {}).then( function(result) { process.send({ cmd: 'completed', result: result }); - }, (err) => { + }, function(err) { process.send({ cmd: 'failed', err: err }); - }).finally(() => status = 'IDLE'); + }).finally(function(){ + status = 'IDLE'; + }); break; case 'stop': break; @@ -48,7 +50,7 @@ var jobHandler = { cmd: 'progress', value: progress }); - } + }; }else{ return target[name]; } diff --git a/lib/process/sandbox.js b/lib/process/sandbox.js index 57f18e9a6..13128d913 100644 --- a/lib/process/sandbox.js +++ b/lib/process/sandbox.js @@ -10,16 +10,16 @@ module.exports = function(processFile){ job: job }); - return (new Promise((resolve, reject) => { + var done = new Promise(function(resolve, reject) { function handler(msg){ switch(msg.cmd){ case 'completed': - child.removeListener('message', handler) + child.removeListener('message', handler); resolve(msg.value); break; case 'failed': case 'error': - child.removeListener('message', handler) + child.removeListener('message', handler); reject(msg.result); break; case 'progress': @@ -29,8 +29,11 @@ module.exports = function(processFile){ } child.on('message', handler); - })).finally( () => childPool.release(child)); + }); + return done.finally( function(){ + childPool.release(child); + }); }); - } -} + }; +}; diff --git a/lib/queue.js b/lib/queue.js index 777965b0a..f2f7b6684 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -7,7 +7,6 @@ var EventEmitter = require('events'); var _ = require('lodash'); var util = require('util'); -var utils = require('./utils'); var url = require('url'); var Job = require('./job'); var scripts = require('./scripts'); @@ -162,7 +161,7 @@ var Queue = function Queue(name, url, opts){ if (semver.lt(version, MINIMUM_REDIS_VERSION)){ _this.emit('error', new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version)); } - }).catch(function(err){ + }).catch(function(/*err*/){ // Ignore this error. }); } diff --git a/lib/scripts.js b/lib/scripts.js index fe4951a0a..a55b6725b 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -27,7 +27,7 @@ var scripts = { }); }, - addJob: function(client, queue, job, opts, token){ + addJob: function(client, queue, job, opts){ var queueKeys = queue.keys; var keys = [ diff --git a/test/test_rate_limiter.js b/test/test_rate_limiter.js index 3d85e2600..b838e961b 100644 --- a/test/test_rate_limiter.js +++ b/test/test_rate_limiter.js @@ -27,20 +27,19 @@ describe('Rate limiter', function () { it('should obey the rate limit', function(done) { var startTime = new Date().getTime(); - var nbProcessed = 0; + var numJobs = 4; queue.process(function() { return Promise.resolve(); }); - queue.add({}); - queue.add({}); - queue.add({}); - queue.add({}); + for(var i=0; i Date: Mon, 11 Sep 2017 23:01:15 +0200 Subject: [PATCH 3/4] added unit tests for sandboxed processors --- lib/process/child-pool.js | 8 +++ lib/process/master.js | 21 +++++- lib/process/sandbox.js | 2 +- lib/queue.js | 1 + test/fixtures/fixture_processor.js | 12 ++++ test/fixtures/fixture_processor_fail.js | 12 ++++ test/fixtures/fixture_processor_progress.js | 23 +++++++ test/test_repeat.js | 3 +- test/test_sandboxed_process.js | 71 +++++++++++++++++++++ 9 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 test/fixtures/fixture_processor.js create mode 100644 test/fixtures/fixture_processor_fail.js create mode 100644 test/fixtures/fixture_processor_progress.js create mode 100644 test/test_sandboxed_process.js diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 4b9483e9c..b82fd2bfa 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -43,3 +43,11 @@ module.exports.retain = function(processFile){ module.exports.release = function(child){; pool[child.pid].retained = false; }; + +module.exports.clean = function(){ + var keys = Object.keys(pool); + for(var i=0; i Date: Mon, 11 Sep 2017 23:15:38 +0200 Subject: [PATCH 4/4] simplified code to be compatible with node 4 --- lib/process/master.js | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/lib/process/master.js b/lib/process/master.js index f031389fd..519e46b71 100644 --- a/lib/process/master.js +++ b/lib/process/master.js @@ -59,22 +59,12 @@ process.on('message', function(msg) { } }); -var jobHandler = { - get: function(target, name) { - if(name === 'progress'){ - return function(progress){ - process.send({ - cmd: 'progress', - value: progress - }); - }; - }else{ - return target[name]; - } - } -}; - function wrapJob(job){ - var proxy = new Proxy(job, jobHandler); - return proxy; + job.progress = function(progress){ + process.send({ + cmd: 'progress', + value: progress + }); + }; + return job; }