diff --git a/lib/internal/queue.js b/lib/internal/queue.js index e357447f9..5e8d4d7f9 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -55,9 +55,10 @@ export default function queue(worker, concurrency, payload) { for (var i = 0, l = tasks.length; i < l; i++) { var task = tasks[i]; + var index = indexOf(workersList, task, 0); if (index >= 0) { - workersList.splice(index) + workersList.splice(index, 1); } task.callback.apply(task, arguments); @@ -118,11 +119,11 @@ export default function queue(worker, concurrency, payload) { for (var i = 0; i < l; i++) { var node = q._tasks.shift(); tasks.push(node); + workersList.push(node); data.push(node.data); } numRunning += 1; - workersList.push(tasks[0]); if (q._tasks.length === 0) { q.empty(); diff --git a/mocha_test/cargo.js b/mocha_test/cargo.js index 235b9a287..55e6e9686 100644 --- a/mocha_test/cargo.js +++ b/mocha_test/cargo.js @@ -236,7 +236,7 @@ describe('cargo', function () { it('expose payload', function (done) { var called_once = false; - var cargo= async.cargo(function(tasks, cb) { + var cargo = async.cargo(function(tasks, cb) { if (!called_once) { expect(cargo.payload).to.equal(1); assert(tasks.length === 1, 'should start with payload = 1'); @@ -261,4 +261,57 @@ describe('cargo', function () { }, 15); }); + it('workersList', function(done) { + var called_once = false; + + function getWorkersListData(cargo) { + return cargo.workersList().map(function(v) { + return v.data; + }); + } + + var cargo = async.cargo(function(tasks, cb) { + if (!called_once) { + expect(tasks).to.eql(['foo', 'bar']); + } else { + expect(tasks).to.eql(['baz']); + } + expect(getWorkersListData(cargo)).to.eql(tasks); + async.setImmediate(function() { + // ensure nothing has changed + expect(getWorkersListData(cargo)).to.eql(tasks); + called_once = true; + cb(); + }); + }, 2); + + cargo.drain = function() { + expect(cargo.workersList()).to.eql([]); + expect(cargo.running()).to.equal(0); + done(); + }; + + cargo.push('foo'); + cargo.push('bar'); + cargo.push('baz'); + }); + + it('running', function(done) { + var cargo = async.cargo(function(tasks, cb) { + expect(cargo.running()).to.equal(1); + async.setImmediate(function() { + expect(cargo.running()).to.equal(1); + cb(); + }); + }, 2); + + cargo.drain = function() { + expect(cargo.running()).to.equal(0); + done(); + }; + + cargo.push('foo'); + cargo.push('bar'); + cargo.push('baz'); + }) }); diff --git a/mocha_test/queue.js b/mocha_test/queue.js index 7525c3056..4539c14be 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -656,7 +656,7 @@ describe('queue', function(){ }); }); - context('q.unsaturated(): ',function() { + context('q.unsaturated(): ', function() { it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){ var calls = []; var q = async.queue(function(task, cb) { @@ -719,6 +719,68 @@ describe('queue', function(){ }); }); + context('workersList', function() { + it('should be the same length as running()', function(done) { + var q = async.queue(function(task, cb) { + async.setImmediate(function() { + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain = function() { + expect(q.workersList().length).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }; + + q.push('foo'); + q.push('bar'); + q.push('baz'); + }); + + it('should contain the items being processed', function(done) { + var itemsBeingProcessed = { + 'foo': ['foo'], + 'foo_cb': ['foo', 'bar'], + 'bar': ['foo', 'bar'], + 'bar_cb': ['bar', 'baz'], + 'baz': ['bar', 'baz'], + 'baz_cb': ['baz'] + }; + + function getWorkersListData(q) { + return q.workersList().map(function(v) { + return v.data; + }); + } + + var q = async.queue(function(task, cb) { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task]); + expect(q.workersList().length).to.equal(q.running()); + async.setImmediate(function() { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task+'_cb']); + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain = function() { + expect(q.workersList()).to.eql([]); + expect(q.workersList().length).to.equal(q.running()); + done(); + }; + + q.push('foo'); + q.push('bar'); + q.push('baz'); + }); + }) + it('remove', function(done) { var result = []; var q = async.queue(function(data, cb) { @@ -738,4 +800,3 @@ describe('queue', function(){ } }); }); -