Skip to content

Commit

Permalink
Merge pull request #504 from OptimalBits/sanitized-options
Browse files Browse the repository at this point in the history
Sanitized options
  • Loading branch information
manast committed Apr 23, 2017
2 parents ffde743 + 616249c commit 450f530
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 222 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,8 @@
v.3.0.0-alpha.2
===============

- Eliminated possible memory leak #503

v.3.0.0-alpha.1
===============

Expand Down
125 changes: 75 additions & 50 deletions README.md
Expand Up @@ -11,7 +11,7 @@ Bull Job Manager

<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg" width="200" />

The fastest, most reliable redis based queue for nodejs.
The fastest, most reliable Redis based queue for nodejs.
Carefully written for rock solid stability and atomicity.


Expand Down Expand Up @@ -50,8 +50,6 @@ There are a few third party UIs that can be used for easier administration of th
* [react-bull](https://github.com/kfatehi/react-bull)
* [toureiro](https://github.com/Epharmix/Toureiro)

We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui)

Roadmap:
--------

Expand All @@ -66,7 +64,7 @@ Install:

npm install bull@2.x --save

Note that you need a redis version higher or equal than 2.8.11 for bull to work properly.
Note that you need a Redis version higher or equal than 2.8.11 for bull to work properly.

**IMPORTANT**

Expand All @@ -78,10 +76,10 @@ Quick Guide
```javascript
var Queue = require('bull');

var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}}); // Specify Redis connection using object
var imageQueue = new Queue('image transcoding');
var pdfQueue = new Queue('pdf transcoding');

videoQueue.process(function(job, done){

Expand All @@ -101,7 +99,7 @@ videoQueue.process(function(job, done){
done(null, { framerate: 29.5 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

audioQueue.process(function(job, done){
Expand All @@ -118,7 +116,7 @@ audioQueue.process(function(job, done){
done(null, { samplerate: 48000 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

imageQueue.process(function(job, done){
Expand All @@ -135,7 +133,7 @@ imageQueue.process(function(job, done){
done(null, { width: 1280, height: 720 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

pdfQueue.process(function(job){
Expand Down Expand Up @@ -231,8 +229,8 @@ queue.on('global:completed', listener);
Queues are cheap, so if you need many of them just create new ones with different
names:
```javascript
var userJohn = Queue('john');
var userLisa = Queue('lisa');
var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.
Expand All @@ -247,7 +245,7 @@ var
cluster = require('cluster');

var numWorkers = 8;
var queue = Queue("test concurrent queue", 6379, '127.0.0.1');
var queue = new Queue("test concurrent queue");

if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
Expand Down Expand Up @@ -275,7 +273,15 @@ if(cluster.isMaster){
Important Notes
---------------

The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled.
The queue aims for "at most once" working strategy. When a worker is processing a job it will keep the job "locked" so other workers can't process it.

It's important to understand how locking works to prevent your jobs from losing their lock - becoming _stalled_ - and being double processed as a result. Locking is implemented internally by creating a lock for `lockDuration` on interval `lockRenewTime` (which is usually half `lockDuration`). If `lockDuration` elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted, or __double processed__. This can happen when:
1. The Node process unexpectedly terminates.
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). In this case, the still-running job is then started again by another worker. You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to consider the job stalled in valid case #1).

As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs were likely double-processed.

As a safeguard so problematic jobs don't get reprocessed indefinitely (e.g. if the job processor aways crashes the Node process), jobs will be recovered from a stalled state a maximum of `maxStalledCount` times (default: `1`).

Reusing Redis connections
-------------------------
Expand All @@ -290,19 +296,15 @@ This can be achieved using the "createClient" option in the queue constructor:
subscriber = new redis();

var opts = {
redis: {
opts: {
createClient: function(type){
switch(type){
case 'client':
return client;
case 'subscriber':
return subscriber;
default:
return new redis();
}
createClient: function(type, opts){
switch(type){
case 'client':
return client;
case 'subscriber':
return subscriber;
default:
return new redis(opts);
}
}
}
}
var queueFoo = new Queue('foobar', opts);
Expand All @@ -325,8 +327,8 @@ Server A:
```javascript
var Queue = require('bull');

var sendQueue = Queue("Server B");
var receiveQueue = Queue("Server A");
var sendQueue = new Queue("Server B");
var receiveQueue = new Queue("Server A");

receiveQueue.process(function(job, done){
console.log("Received message", job.data.msg);
Expand All @@ -340,8 +342,8 @@ Server B:
```javascript
var Queue = require('bull');

var sendQueue = Queue("Server A");
var receiveQueue = Queue("Server B");
var sendQueue = new Queue("Server A");
var receiveQueue = new Queue("Server B");

receiveQueue.process(function(job, done){
console.log("Received message", job.data.msg);
Expand Down Expand Up @@ -390,35 +392,58 @@ listened by some other service that stores the results in a database.

### Queue

```ts
Queue(queueName: string, redisPort: number, redisHost: string, redisOpts?: RedisOpts): Queue
```
```ts
Queue(queueName: string, redisConnectionString: string, redisOpts? RedisOpts): Queue
```typescript
new Queue(queueName: string, redisConnectionString?: string, opts: QueueOptions): Queue
```

This is the Queue constructor. It creates a new Queue that is persisted in
Redis. Everytime the same queue is instantiated it tries to process all the
old jobs that may exist from a previous unfinished session.

If no connection string or options passed, the queue will use ioredis default connection
settings.

__Arguments__

```javascript
queueName {String} A unique name for this Queue.
redisPort {Number} A port where redis server is running.
redisHost {String} A host specified as IP or domain where redis is running.
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```typescript
queueName: string, // A unique name for this Queue.
redisConnectionString?: string, // string A connection string containing the redis server host, port and (optional) authentication.
opts?: QueueOptions, // Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```

Alternatively, it's possible to pass a connection string to create a new queue.
```typescript
interface QueueOptions {
prefix?: string = 'bull',
redis : RedisOpts, // ioredis defaults
createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient,

// Advanced settings (see below)
settings?: QueueSettings {
lockDuration?: number = 5000,
lockRenewTime?: number = lockDuration / 2,
stalledInterval?: number = 5000,
maxStalledCount?: number = 1,
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
}
}
```

__Arguments__
__Advanced Settings__

```javascript
queueName {String} A unique name for this Queue.
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication.
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```
__Warning:__ Do not override these advanced settings unless you understand the internals of the queue.

`lockDuration`: Time in milliseconds to acquire the job lock. Set this to a higher value if you find that your jobs are being stalled because your job processor is CPU-intensive and blocking the event loop (see note below about stalled jobs). Set this to a lower value if your jobs are extremely time-sensitive and it might be OK if they get double-processed (due to them be falsly considered stalled).

`lockRenewTime`: Interval in milliseconds on which to acquire the job lock. It is set to `lockDuration / 2` by default to give enough buffer to renew the lock each time before the job lock expires. It should never be set to a value larger than `lockDuration`. Set this to a lower value if you're finding that jobs are becoming stalled due to a CPU-intensive job processor function. Generally you shouldn't change this though.

`stalledInterval`: Interval in milliseconds on which each worker will check for stalled jobs (i.e. unlocked jobs in the `active` state). See note below about stalled jobs. Set this to a lower value if your jobs are extremely time-sensitive. Set this to a higher value if your Redis CPU usage is high as this check can be expensive. Note that because each worker runs this on its own interval and checks the entire queue, the stalled job actually run much more frequently than this value would imply.

`maxStalledCount`: The maximum number of times a job can be restarted before it will be permamently failed with the error `job stalled more than allowable limit`. This is set to a default of `1` with the assumption that stalled jobs should be very rare (only due to process crashes) and you want to be on the safer side of not restarting jobs. Set this higher if stalled jobs are common (e.g. processes crash a lot) and it's generally OK to double process jobs.

`guardInterval`: Interval in milliseconds on which the delayed job watchdog will run. This watchdog is only in place for unstable Redis connections which can caused delayed jobs to not be processed. Set to a lower value if your Redis connection is unstable and delayed jobs aren't being processed in time.

`retryProcessDelay`: Time in milliseconds in which to wait before trying to process jobs, in case of a Redis error. Set to a lower value on an unstable Redis connection.

---------------------------------------

Expand Down Expand Up @@ -614,7 +639,7 @@ shutdown.

```javascript
var Queue = require('bull');
var queue = Queue('example');
var queue = new Queue('example');

var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
Expand Down Expand Up @@ -830,7 +855,7 @@ The priority queue will process more often higher priority jobs than lower.
});
```

Warning!!: Priority queue use 5 times more redis connections than a normal queue.
Warning!!: Priority queue use 5 times more Redis connections than a normal queue.


#### Debugging
Expand Down

0 comments on commit 450f530

Please sign in to comment.