Skip to content

Commit

Permalink
Merge pull request #1030 from ecasilla/modularization
Browse files Browse the repository at this point in the history
async.queue.unsaturated #868
  • Loading branch information
aearly committed Feb 23, 2016
2 parents 5da468f + e642157 commit 3cf46d0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Expand Up @@ -1191,8 +1191,9 @@ methods:
the `worker` has finished processing the task. Instead of a single task, a `tasks` array
can be submitted. The respective callback is used for every task in the list.
* `unshift(task, [callback])` - add a new task to the front of the `queue`.
* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit,
and further tasks will be queued.
* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, and further tasks will be queued.
* `unsaturated` - a callback that is called when the `queue` length is less than the `concurrency` & `buffer` limits, and further tasks will not be queued.
* `buffer` A minimum threshold buffer in order to say that the `queue` is `unsaturated`.
* `empty` - a callback that is called when the last item from the `queue` is given to a `worker`.
* `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`.
* `paused` - a boolean for determining whether the queue is in a paused state
Expand Down
5 changes: 5 additions & 0 deletions lib/internal/queue.js
Expand Up @@ -45,6 +45,9 @@ export default function queue(worker, concurrency, payload) {
if (q.tasks.length === q.concurrency) {
q.saturated();
}
if (q.tasks.length <= (q.concurrency - q.buffer) ) {
q.unsaturated();
}
});
setImmediate(q.process);
}
Expand Down Expand Up @@ -78,6 +81,8 @@ export default function queue(worker, concurrency, payload) {
concurrency: concurrency,
payload: payload,
saturated: noop,
unsaturated:noop,
buffer: concurrency / 4,
empty: noop,
drain: noop,
started: false,
Expand Down
49 changes: 49 additions & 0 deletions mocha_test/queue.js
@@ -0,0 +1,49 @@
var async = require('../lib');
var expect = require('chai').expect;


describe('queue', function(){
context('q.unsaturated(): ',function() {
it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){
var q = async.queue(function(task, cb) {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 10);
expect(q.buffer).to.equal(2.5);
done();
});
it('should allow a user to change the buffer property', function(done){
var q = async.queue(function(task, cb) {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 10);
q.buffer = 4;
expect(q.buffer).to.not.equal(2.5);
expect(q.buffer).to.equal(4);
done();
});
it('should call the unsaturated callback if tasks length is less than concurrency minus buffer', function(done){
var calls = [];
var q = async.queue(function(task, cb) {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 10);
q.unsaturated = function() {
calls.push('unsaturated');
};
q.empty = function() {
expect(calls.indexOf('unsaturated')).to.be.above(-1);
done();
};
q.push('foo0', function () {calls.push('foo0 cb');});
q.push('foo1', function () {calls.push('foo1 cb');});
q.push('foo2', function () {calls.push('foo2 cb');});
q.push('foo3', function () {calls.push('foo3 cb');});
q.push('foo4', function () {calls.push('foo4 cb');});
});
});
});

0 comments on commit 3cf46d0

Please sign in to comment.