diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0df1094..5f57a1462 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +v.3.0.0-alpha.2 +=============== + +- Eliminated possible memory leak #503 + v.3.0.0-alpha.1 =============== diff --git a/README.md b/README.md index 3f222cff2..14d1f39a8 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Bull Job Manager -The fastest, most reliable redis based queue for nodejs. +The fastest, most reliable Redis based queue for nodejs. Carefully written for rock solid stability and atomicity. @@ -50,8 +50,6 @@ There are a few third party UIs that can be used for easier administration of th * [react-bull](https://github.com/kfatehi/react-bull) * [toureiro](https://github.com/Epharmix/Toureiro) -We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui) - Roadmap: -------- @@ -66,7 +64,7 @@ Install: npm install bull@2.x --save -Note that you need a redis version higher or equal than 2.8.11 for bull to work properly. +Note that you need a Redis version higher or equal than 2.8.11 for bull to work properly. **IMPORTANT** @@ -78,10 +76,10 @@ Quick Guide ```javascript var Queue = require('bull'); -var videoQueue = Queue('video transcoding', 6379, '127.0.0.1'); -var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1'); -var imageQueue = Queue('image transcoding', 6379, '127.0.0.1'); -var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1'); +var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379'); +var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}}); // Specify Redis connection using object +var imageQueue = new Queue('image transcoding'); +var pdfQueue = new Queue('pdf transcoding'); videoQueue.process(function(job, done){ @@ -101,7 +99,7 @@ videoQueue.process(function(job, done){ done(null, { framerate: 29.5 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly - throw (Error('some unexpected error')); + throw new Error('some unexpected error'); }); audioQueue.process(function(job, done){ @@ -118,7 +116,7 @@ audioQueue.process(function(job, done){ done(null, { samplerate: 48000 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly - throw (Error('some unexpected error')); + throw new Error('some unexpected error'); }); imageQueue.process(function(job, done){ @@ -135,7 +133,7 @@ imageQueue.process(function(job, done){ done(null, { width: 1280, height: 720 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly - throw (Error('some unexpected error')); + throw new Error('some unexpected error'); }); pdfQueue.process(function(job){ @@ -231,8 +229,8 @@ queue.on('global:completed', listener); Queues are cheap, so if you need many of them just create new ones with different names: ```javascript -var userJohn = Queue('john'); -var userLisa = Queue('lisa'); +var userJohn = new Queue('john'); +var userLisa = new Queue('lisa'); . . . @@ -247,7 +245,7 @@ var cluster = require('cluster'); var numWorkers = 8; -var queue = Queue("test concurrent queue", 6379, '127.0.0.1'); +var queue = new Queue("test concurrent queue"); if(cluster.isMaster){ for (var i = 0; i < numWorkers; i++) { @@ -275,7 +273,15 @@ if(cluster.isMaster){ Important Notes --------------- -The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled. +The queue aims for "at most once" working strategy. When a worker is processing a job it will keep the job "locked" so other workers can't process it. + +It's important to understand how locking works to prevent your jobs from losing their lock - becoming _stalled_ - and being double processed as a result. Locking is implemented internally by creating a lock for `lockDuration` on interval `lockRenewTime` (which is usually half `lockDuration`). If `lockDuration` elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted, or __double processed__. This can happen when: +1. The Node process unexpectedly terminates. +2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). In this case, the still-running job is then started again by another worker. You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to consider the job stalled in valid case #1). + +As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs were likely double-processed. + +As a safeguard so problematic jobs don't get reprocessed indefinitely (e.g. if the job processor aways crashes the Node process), jobs will be recovered from a stalled state a maximum of `maxStalledCount` times (default: `1`). Reusing Redis connections ------------------------- @@ -290,19 +296,15 @@ This can be achieved using the "createClient" option in the queue constructor: subscriber = new redis(); var opts = { - redis: { - opts: { - createClient: function(type){ - switch(type){ - case 'client': - return client; - case 'subscriber': - return subscriber; - default: - return new redis(); - } + createClient: function(type, opts){ + switch(type){ + case 'client': + return client; + case 'subscriber': + return subscriber; + default: + return new redis(opts); } - } } } var queueFoo = new Queue('foobar', opts); @@ -325,8 +327,8 @@ Server A: ```javascript var Queue = require('bull'); -var sendQueue = Queue("Server B"); -var receiveQueue = Queue("Server A"); +var sendQueue = new Queue("Server B"); +var receiveQueue = new Queue("Server A"); receiveQueue.process(function(job, done){ console.log("Received message", job.data.msg); @@ -340,8 +342,8 @@ Server B: ```javascript var Queue = require('bull'); -var sendQueue = Queue("Server A"); -var receiveQueue = Queue("Server B"); +var sendQueue = new Queue("Server A"); +var receiveQueue = new Queue("Server B"); receiveQueue.process(function(job, done){ console.log("Received message", job.data.msg); @@ -390,35 +392,58 @@ listened by some other service that stores the results in a database. ### Queue -```ts -Queue(queueName: string, redisPort: number, redisHost: string, redisOpts?: RedisOpts): Queue -``` -```ts -Queue(queueName: string, redisConnectionString: string, redisOpts? RedisOpts): Queue +```typescript +new Queue(queueName: string, redisConnectionString?: string, opts: QueueOptions): Queue ``` This is the Queue constructor. It creates a new Queue that is persisted in Redis. Everytime the same queue is instantiated it tries to process all the old jobs that may exist from a previous unfinished session. +If no connection string or options passed, the queue will use ioredis default connection +settings. + __Arguments__ -```javascript - queueName {String} A unique name for this Queue. - redisPort {Number} A port where redis server is running. - redisHost {String} A host specified as IP or domain where redis is running. - redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options +```typescript + queueName: string, // A unique name for this Queue. + redisConnectionString?: string, // string A connection string containing the redis server host, port and (optional) authentication. + opts?: QueueOptions, // Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options ``` -Alternatively, it's possible to pass a connection string to create a new queue. +```typescript + interface QueueOptions { + prefix?: string = 'bull', + redis : RedisOpts, // ioredis defaults + createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient, + + // Advanced settings (see below) + settings?: QueueSettings { + lockDuration?: number = 5000, + lockRenewTime?: number = lockDuration / 2, + stalledInterval?: number = 5000, + maxStalledCount?: number = 1, + guardInterval?: number = 5000, + retryProcessDelay?: number = 5000, + } + } +``` -__Arguments__ +__Advanced Settings__ -```javascript - queueName {String} A unique name for this Queue. - redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication. - redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options -``` +__Warning:__ Do not override these advanced settings unless you understand the internals of the queue. + +`lockDuration`: Time in milliseconds to acquire the job lock. Set this to a higher value if you find that your jobs are being stalled because your job processor is CPU-intensive and blocking the event loop (see note below about stalled jobs). Set this to a lower value if your jobs are extremely time-sensitive and it might be OK if they get double-processed (due to them be falsly considered stalled). + +`lockRenewTime`: Interval in milliseconds on which to acquire the job lock. It is set to `lockDuration / 2` by default to give enough buffer to renew the lock each time before the job lock expires. It should never be set to a value larger than `lockDuration`. Set this to a lower value if you're finding that jobs are becoming stalled due to a CPU-intensive job processor function. Generally you shouldn't change this though. + +`stalledInterval`: Interval in milliseconds on which each worker will check for stalled jobs (i.e. unlocked jobs in the `active` state). See note below about stalled jobs. Set this to a lower value if your jobs are extremely time-sensitive. Set this to a higher value if your Redis CPU usage is high as this check can be expensive. Note that because each worker runs this on its own interval and checks the entire queue, the stalled job actually run much more frequently than this value would imply. + +`maxStalledCount`: The maximum number of times a job can be restarted before it will be permamently failed with the error `job stalled more than allowable limit`. This is set to a default of `1` with the assumption that stalled jobs should be very rare (only due to process crashes) and you want to be on the safer side of not restarting jobs. Set this higher if stalled jobs are common (e.g. processes crash a lot) and it's generally OK to double process jobs. + +`guardInterval`: Interval in milliseconds on which the delayed job watchdog will run. This watchdog is only in place for unstable Redis connections which can caused delayed jobs to not be processed. Set to a lower value if your Redis connection is unstable and delayed jobs aren't being processed in time. + +`retryProcessDelay`: Time in milliseconds in which to wait before trying to process jobs, in case of a Redis error. Set to a lower value on an unstable Redis connection. --------------------------------------- @@ -614,7 +639,7 @@ shutdown. ```javascript var Queue = require('bull'); -var queue = Queue('example'); +var queue = new Queue('example'); var after100 = _.after(100, function () { queue.close().then(function () { console.log('done') }) @@ -830,7 +855,7 @@ The priority queue will process more often higher priority jobs than lower. }); ``` -Warning!!: Priority queue use 5 times more redis connections than a normal queue. +Warning!!: Priority queue use 5 times more Redis connections than a normal queue. #### Debugging diff --git a/lib/queue.js b/lib/queue.js index 01ff1410f..ce3ac4cba 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -50,85 +50,76 @@ var commands = require('./commands/'); */ var MINIMUM_REDIS_VERSION = '2.8.11'; -var LOCK_DURATION = 5000; // 5 seconds is the duration of the lock. - -// The interval for which to check for stalled jobs. -var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time. +var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed -// The maximum number of times a job can be recovered from the 'stalled' state -// (moved back to 'wait'), before it is failed. -var MAX_STALLED_JOB_COUNT = 1; +/* + interface QueueOptions { + prefix?: string = 'bull', + redis : RedisOpts, // ioredis defaults + createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient, + + // Advanced settings + settings?: QueueSettings { + lockDuration?: number = 5000, + stalledInterval?: number = 5000, + maxStalledCount?: number = 1, // The maximum number of times a job can be recovered from the 'stalled' state + guardInterval?: number = 5000, + retryProcessDelay?: number = 5000 + } + } +*/ -var POLLING_INTERVAL = 5000; +// Queue(name: string, url?, opts?) +var Queue = function Queue(name, url, opts){ + var _this = this; + if(!(this instanceof Queue)){ + return new Queue(name, opts); + } -var REDLOCK_DRIFT_FACTOR = 0.01; -var REDLOCK_RETRY_COUNT = 0; -var REDLOCK_RETRY_DELAY = 200; + if(_.isString(url)){ + opts = _.defaults(redisOptsFromUrl, opts); + }else{ + opts = url; + } -var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed + opts = opts || {}; -var Queue = function Queue(name, redisPort, redisHost, redisOptions){ - if(!(this instanceof Queue)){ - return new Queue(name, redisPort, redisHost, redisOptions); + if(opts && !_.isObject(opts)){ + throw Error('Options must be a valid object'); } - if(_.isObject(redisPort)) { - var opts = redisPort; - var redisOpts = opts.redis || {}; - redisPort = redisOpts.port; - redisHost = redisOpts.host; - redisOptions = redisOpts.opts || {}; - redisOptions.db = redisOpts.DB || redisOpts.DB; - } else if(parseInt(redisPort) == redisPort) { - redisPort = parseInt(redisPort); - redisOptions = redisOptions || {}; - } else if(_.isString(redisPort)) { - try { - var redisUrl = url.parse(redisPort); - assert(_.isObject(redisHost) || _.isUndefined(redisHost), - 'Expected an object as redis option'); - redisOptions = redisHost || {}; - redisPort = redisUrl.port; - redisHost = redisUrl.hostname; - if (redisUrl.auth) { - redisOptions.password = redisUrl.auth.split(':')[1]; - } - } catch (e) { - throw new Error(e.message); - } - } + var redisOpts = opts.redis || {}; - redisOptions = redisOptions || {}; + _.defaults(redisOpts, { + port: 6379, + host: '127.0.0.1', + db: redisOpts.db || redisOpts.DB + }); - function createClient(type) { + function createClient(type, redisOpts) { var client; - if(_.isFunction(redisOptions.createClient)){ - client = redisOptions.createClient(type); + if(_.isFunction(opts.createClient)){ + client = opts.createClient(type, redisOpts); }else{ - client = new redis(redisPort, redisHost, redisOptions); + client = new redis(redisOpts); } return client; } - redisPort = redisPort || 6379; - redisHost = redisHost || '127.0.0.1'; - - var _this = this; - this.name = name; - this.keyPrefix = redisOptions.keyPrefix || 'bull'; + this.keyPrefix = redisOpts.keyPrefix || opts.prefix || 'bull'; this.token = uuid(); // - // We cannot use ioredis keyPrefix feature until we - // stop creating keys dynamically in lua scripts. + // We cannot use ioredis keyPrefix feature since we + // create keys dynamically in lua scripts. // - delete redisOptions.keyPrefix; + delete redisOpts.keyPrefix; // // Create queue client (used to add jobs, pause queues, etc); // - this.client = createClient('client'); + this.client = createClient('client', redisOpts); getRedisVersion(this.client).then(function(version){ if(semver.lt(version, MINIMUM_REDIS_VERSION)){ @@ -138,34 +129,25 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ _this.emit('error', err); }); - // - // Keep track of cluster clients for redlock - // (Redlock is not used ATM.) - this.clients = [this.client]; - if (redisOptions.clients) { - this.clients.push.apply(this.clients, redisOptions.clients); - } - this.redlock = { - driftFactor: REDLOCK_DRIFT_FACTOR, - retryCount: REDLOCK_RETRY_COUNT, - retryDelay: REDLOCK_RETRY_DELAY - }; - _.extend(this.redlock, redisOptions.redlock || {}); - // // Create event subscriber client (receive messages from other instance of the queue) // - this.eclient = createClient('subscriber'); + this.eclient = createClient('subscriber', redisOpts); this.handlers = {}; this.delayTimer = null; this.processing = []; this.retrieving = 0; - this.LOCK_DURATION = LOCK_DURATION; - this.LOCK_RENEW_TIME = LOCK_DURATION / 2; - this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; - this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; + this.settings = _.defaults(opts.settings, { + lockDuration: 5000, + stalledInterval: 5000, + maxStalledCount: 1, + guardInterval: 5000, + retryProcessDelay: 5000 + }); + + this.settings.lockRenewTime = this.settings.lockRenewTime || this.settings.lockDuration / 2; // bubble up Redis error events [this.client, this.eclient].forEach(function (client) { @@ -250,47 +232,68 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ }, true); } - // - // Init delay timestamp. - // this.delayedTimestamp = Number.MAX_VALUE; this.isReady().then(function(){ + // + // Init delay timestamp. + // scripts.updateDelaySet(_this, Date.now()).then(function(timestamp){ if(timestamp){ _this.updateDelayTimer(timestamp); } }); - }); - // - // Create a guardian timer to revive delayTimer if necessary - // This is necessary when redis connection is unstable, which can cause the pub/sub to fail - // - this.guardianTimer = setInterval(function() { - if(_this.delayedTimestamp < Date.now() || _this.delayedTimestamp - Date.now() > POLLING_INTERVAL){ - scripts.updateDelaySet(_this, Date.now()).then(function(timestamp){ - if(timestamp){ - _this.updateDelayTimer(timestamp); - } - }).catch(function(err){ - _this.emit('error', err); - }); - } // - // Trigger a getNextJob (if worker is idling) + // Create a guardian timer to revive delayTimer if necessary + // This is necessary when redis connection is unstable, which can cause the pub/sub to fail // - _this.emit('added'); - }, POLLING_INTERVAL); + _this.guardianTimer = setGuardianTimer(_this); + }); + + this.errorRetryTimer = {}; // Bind these methods to avoid constant rebinding and/or creating closures // in processJobs etc. this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); -}; +} + +function redisOptsFromUrl(urlString){ + var redisOpts = {}; + try { + var redisUrl = url.parse(urlString); + redisOpts.port = redisUrl.port; + redisOpts.host = redisUrl.hostname; + if (redisUrl.auth) { + redisOpts.password = redisUrl.auth.split(':')[1]; + } + } catch (e) { + throw new Error(e.message); + } + return redisOpts; +} util.inherits(Queue, Disturbed); +function setGuardianTimer(queue){ + return setInterval(function() { + if(queue.delayedTimestamp < Date.now() || queue.delayedTimestamp - Date.now() > queue.settings.guardInterval){ + scripts.updateDelaySet(queue, Date.now()).then(function(timestamp){ + if(timestamp){ + queue.updateDelayTimer(timestamp); + } + }).catch(function(err){ + queue.emit('error', err); + }); + } + // + // Trigger a getNextJob (if worker is idling) + // + queue.emit('added'); + }, queue.settings.guardInterval); +} + Queue.ErrorMessages = errors.Messages; Queue.prototype.isReady = function(){ @@ -359,6 +362,9 @@ Queue.prototype.close = function( doNotWaitJobs ){ } return this.closing = this._initializing.then(function(){ + _.each(_this.errorRetryTimer, function(timer){ + clearTimeout(timer); + }); clearTimeout(_this.delayTimer); clearInterval(_this.guardianTimer); clearInterval(_this.moveUnlockedJobsToWaitInterval); @@ -566,8 +572,8 @@ Queue.prototype.run = function(concurrency){ return this.moveUnlockedJobsToWait().then(function(){ while(concurrency--){ - promises.push(new Promise(function(resolve, reject){ - _this.processJobs(concurrency, resolve, reject); + promises.push(new Promise(function(resolve){ + _this.processJobs(concurrency, resolve); })); } @@ -643,25 +649,32 @@ Queue.prototype.moveUnlockedJobsToWait = function(){ Queue.prototype.startMoveUnlockedJobsToWait = function() { clearInterval(this.moveUnlockedJobsToWaitInterval); - if (this.STALLED_JOB_CHECK_INTERVAL > 0){ + if (this.settings.stalledInterval > 0){ this.moveUnlockedJobsToWaitInterval = - setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL); + setInterval(this.moveUnlockedJobsToWait, this.settings.stalledInterval); } }; -Queue.prototype.processJobs = function(index, resolve, reject){ +Queue.prototype.processJobs = function(index, resolve){ var _this = this; - var processJobs = this.processJobs.bind(this, index, resolve, reject); + var processJobs = this.processJobs.bind(this, index, resolve); process.nextTick(function(){ if(!_this.closing){ (_this.paused || Promise.resolve()).then(function(){ return _this.processing[index] = _this.getNextJob() .then(_this.processJob) .then(processJobs, function(err){ - _this.emit('error', err, 'Error processing job'); - processJobs(); + // + // Wait before trying to process again. + // + clearTimeout(_this.errorRetryTimer[index]); + _this.errorRetryTimer[index] = setTimeout(function(){ + processJobs(); + }, _this.settings.retryProcessDelay); }); - }).catch(reject); // Not sure this catch is correct here. + }).catch(function(err){ + _this.emit('error', err, 'Error processing job'); + }); }else{ resolve(_this.closing); } @@ -686,11 +699,13 @@ Queue.prototype.processJob = function(job){ // by another worker. See #308 // var lockExtender = function(){ - lockRenewId = _this.timers.set('lockExtender', _this.LOCK_RENEW_TIME, function(){ + lockRenewId = _this.timers.set('lockExtender', _this.settings.lockRenewTime, function(){ scripts.extendLock(_this, job.id).then(function(lock){ if(lock && !timerStopped){ lockExtender(); } + }).catch(function(err){ + // Somehow tell the worker this job should stop processing... }); }); }; @@ -760,30 +775,31 @@ Queue.prototype.getNextJob = function() { // // Listen for new jobs, during moveToActive or after. // - var resolver; - var newJobs = new Promise(function(resolve){ - resolver = function(){ - removeListeners(); - resolve(); - }; - _this.on('added', resolver); - _this.on('resumed', resolver); - _this.on('wait-finished', resolver); + var resolve; + var newJobs = new Promise(function(_resolve){ + // Needs to wrap to ignore the emitted value, or the promise will not resolve. + resolve = function(){ + _resolve(); + } + _this.on('added', resolve); + _this.on('resumed', resolve); + _this.on('wait-finished', resolve); }); var removeListeners = function(){ - _this.removeListener('added', resolver); - _this.removeListener('resumed', resolver); - _this.removeListener('wait-finished', resolver); + _this.removeListener('added', resolve); + _this.removeListener('resumed', resolve); + _this.removeListener('wait-finished', resolve); } return scripts.moveToActive(this).spread(function(jobData, jobId){ if(jobData){ - removeListeners(); return Job.fromData(_this, jobData, jobId); }else{ return newJobs; } + }).finally(function(){ + removeListeners(); }); }; diff --git a/lib/scripts.js b/lib/scripts.js index 7a023a160..0de534b76 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -82,7 +82,7 @@ var scripts = { var args = [ queue.toKey(''), queue.token, - queue.LOCK_DURATION + queue.settings.lockDuration ]; return queue.client.moveToActive(keys.concat(args)).then(function(result){ @@ -125,6 +125,7 @@ var scripts = { return job.queue.client.moveToFinished(args); }, + // TODO: add a retention argument for completed and finished jobs (in time). moveToCompleted: function(job, returnvalue, removeOnComplete, ignoreLock){ return scripts.moveToFinished(job, returnvalue, 'returnvalue', removeOnComplete, 'completed', ignoreLock); }, @@ -199,7 +200,7 @@ var scripts = { }, extendLock: function(queue, jobId){ - return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.LOCK_DURATION]); + return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.settings.lockDuration]); }, releaseLock: function(queue, jobId){ @@ -207,7 +208,7 @@ var scripts = { }, takeLock: function(queue, job){ - return queue.client.takeLock([job.lockKey(), queue.token, queue.LOCK_DURATION]); + return queue.client.takeLock([job.lockKey(), queue.token, queue.settings.lockDuration]); }, /** @@ -234,13 +235,13 @@ var scripts = { * The job was being worked on, but the worker process died and it failed to renew the lock. * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, - * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. + * (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to settings.maxStalledCount. */ moveUnlockedJobsToWait: function(queue){ var keys = _.map(['active', 'wait', 'failed', 'added'], function(key){ return queue.toKey(key); }); - var args = [queue.MAX_STALLED_JOB_COUNT, queue.toKey(''), Date.now()]; + var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now()]; return queue.client.moveUnlockedJobsToWait(keys.concat(args)); }, diff --git a/package.json b/package.json index e01e2aa16..f34baf314 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bull", - "version": "3.0.0-alpha.1", + "version": "3.0.0-alpha.2", "description": "Job manager", "main": "index.js", "repository": { diff --git a/test/test_job.js b/test/test_job.js index 8d7ba57e3..a79fcd965 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -18,11 +18,11 @@ describe('Job', function(){ }); beforeEach(function(){ - queue = new Queue('test-' + uuid(), 6379, '127.0.0.1'); + queue = new Queue('test-' + uuid(), {redis: {port: 6379, host: '127.0.0.1'}}); }); afterEach(function(){ - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); diff --git a/test/test_queue.js b/test/test_queue.js index eeca9e307..c3df5791e 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -70,10 +70,14 @@ describe('Queue', function () { return closePromise; }); - it('should close if the job expires after the LOCK_RENEW_TIME', function (done) { - this.timeout(testQueue.STALLED_JOB_CHECK_INTERVAL * 2); - testQueue.LOCK_DURATION = 15; - testQueue.LOCK_RENEW_TIME = 5; + it('should close if the job expires after the lockRenewTime', function (done) { + this.timeout(testQueue.settings.stalledInterval * 2, { + settings: { + lockDuration: 15, + lockRenewTime: 5 + } + }); + testQueue.process(function () { return Promise.delay(100); }); @@ -244,18 +248,14 @@ describe('Queue', function () { subscriber = new redis(); var opts = { - redis: { - opts: { - createClient: function(type){ - switch(type){ - case 'client': - return client; - case 'subscriber': - return subscriber; - default: - return new redis(); - } - } + createClient: function(type, opts){ + switch(type){ + case 'client': + return client; + case 'subscriber': + return subscriber; + default: + return new redis(opts); } } } @@ -316,7 +316,7 @@ describe('Queue', function () { }); afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return utils.cleanupQueues(); }); @@ -586,9 +586,12 @@ describe('Queue', function () { it('process stalled jobs when starting a queue', function (done) { this.timeout(6000); - utils.newQueue('test queue stalled').then(function (queueStalled) { - queueStalled.LOCK_DURATION = 15; - queueStalled.LOCK_RENEW_TIME = 5 + utils.newQueue('test queue stalled', { + settings: { + lockDuration: 15, + lockRenewTime: 5 + } + }).then(function (queueStalled) { var jobs = [ queueStalled.add({ bar: 'baz' }), queueStalled.add({ bar1: 'baz1' }), @@ -632,8 +635,11 @@ describe('Queue', function () { }); it('processes jobs that were added before the queue backend started', function () { - return utils.newQueue('test queue added before').then(function (queueStalled) { - queueStalled.LOCK_RENEW_TIME = 10; + return utils.newQueue('test queue added before', { + settings: { + lockRenewTime: 10 + } + }).then(function (queueStalled) { var jobs = [ queueStalled.add({ bar: 'baz' }), queueStalled.add({ bar1: 'baz1' }), @@ -735,16 +741,21 @@ describe('Queue', function () { it('processes several stalled jobs when starting several queues', function (done) { this.timeout(50000); - var NUM_QUEUES = 5; + var NUM_QUEUES = 10; var NUM_JOBS_PER_QUEUE = 10; var stalledQueues = []; var jobs = []; + var redisOpts = {port: 6379, host: '127.0.0.1'}; for (var i = 0; i < NUM_QUEUES; i++) { - var queueStalled2 = new Queue('test queue stalled 2', 6379, '127.0.0.1'); - queueStalled2.LOCK_DURATION = 30; - queueStalled2.LOCK_RENEW_TIME = 10; - + var queueStalled2 = new Queue('test queue stalled 2', { + redis: redisOpts, + settings: { + lockDuration: 30, + lockRenewTime: 10 + } + }); + for (var j = 0; j < NUM_JOBS_PER_QUEUE; j++) { jobs.push(queueStalled2.add({ job: j })); } @@ -766,7 +777,10 @@ describe('Queue', function () { processed++; if (processed === stalledQueues.length) { setTimeout(function () { - var queue2 = new Queue('test queue stalled 2', 6379, '127.0.0.1'); + var queue2 = new Queue('test queue stalled 2', redisOpts); + queue2.on('error', function(){ + + }) queue2.process(function (job2, jobDone) { jobDone(); }); @@ -833,16 +847,18 @@ describe('Queue', function () { it('process stalled jobs without requiring a queue restart', function (done) { this.timeout(12000); - var queue2 = utils.buildQueue('running-stalled-job-' + uuid()); + var queue2 = utils.buildQueue('running-stalled-job-' + uuid(), { + settings: { + lockRenewTime: 5000, + lockDuration: 500, + stalledInterval: 1000 + } + }); var collect = _.after(2, function () { queue2.close().then(done); }); - queue2.LOCK_RENEW_TIME = 5000; - queue2.LOCK_DURATION = 500; - queue2.STALLED_JOB_CHECK_INTERVAL = 1000; - queue2.on('completed', function () { var client = new redis(); client.multi() @@ -1369,12 +1385,10 @@ describe('Queue', function () { }); }); - queue.on('ready', function () { - queue.add({ delayed: 'foobar' }, { delay: delay }).then(function (job) { - expect(job.id).to.be.ok(); - expect(job.data.delayed).to.be('foobar'); - expect(job.delay).to.be(delay); - }); + queue.add({ delayed: 'foobar' }, { delay: delay }).then(function (job) { + expect(job.id).to.be.ok(); + expect(job.data.delayed).to.be('foobar'); + expect(job.delay).to.be(delay); }); }); @@ -1496,7 +1510,7 @@ describe('Queue', function () { }); afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); @@ -1602,7 +1616,7 @@ describe('Queue', function () { var queue; afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); @@ -1860,7 +1874,7 @@ describe('Queue', function () { }); afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); @@ -1991,7 +2005,7 @@ describe('Queue', function () { }); afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); @@ -2105,7 +2119,7 @@ describe('Queue', function () { }); afterEach(function () { - this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); + this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount)); return queue.close(); }); diff --git a/test/utils.js b/test/utils.js index f4bfeb809..cb575f515 100644 --- a/test/utils.js +++ b/test/utils.js @@ -13,7 +13,7 @@ function simulateDisconnect(queue){ } function buildQueue(name) { - var queue = new Queue(name || STD_QUEUE_NAME, 6379, '127.0.0.1'); + var queue = new Queue(name || STD_QUEUE_NAME, {redis: {port: 6379, host: '127.0.0.1'}}); queues.push(queue); return queue; }