Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure q.workersList() contains items being processed [fixes #1428] #1429

Merged
merged 3 commits into from Jun 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/internal/queue.js
Expand Up @@ -55,9 +55,10 @@ export default function queue(worker, concurrency, payload) {

for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];

var index = indexOf(workersList, task, 0);
if (index >= 0) {
workersList.splice(index)
workersList.splice(index, 1);
}

task.callback.apply(task, arguments);
Expand Down Expand Up @@ -118,11 +119,11 @@ export default function queue(worker, concurrency, payload) {
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
workersList.push(node);
data.push(node.data);
}

numRunning += 1;
workersList.push(tasks[0]);

if (q._tasks.length === 0) {
q.empty();
Expand Down
55 changes: 54 additions & 1 deletion mocha_test/cargo.js
Expand Up @@ -236,7 +236,7 @@ describe('cargo', function () {

it('expose payload', function (done) {
var called_once = false;
var cargo= async.cargo(function(tasks, cb) {
var cargo = async.cargo(function(tasks, cb) {
if (!called_once) {
expect(cargo.payload).to.equal(1);
assert(tasks.length === 1, 'should start with payload = 1');
Expand All @@ -261,4 +261,57 @@ describe('cargo', function () {
}, 15);
});

it('workersList', function(done) {
var called_once = false;

function getWorkersListData(cargo) {
return cargo.workersList().map(function(v) {
return v.data;
});
}

var cargo = async.cargo(function(tasks, cb) {
if (!called_once) {
expect(tasks).to.eql(['foo', 'bar']);
} else {
expect(tasks).to.eql(['baz']);
}
expect(getWorkersListData(cargo)).to.eql(tasks);
async.setImmediate(function() {
// ensure nothing has changed
expect(getWorkersListData(cargo)).to.eql(tasks);
called_once = true;
cb();
});
}, 2);

cargo.drain = function() {
expect(cargo.workersList()).to.eql([]);
expect(cargo.running()).to.equal(0);
done();
};

cargo.push('foo');
cargo.push('bar');
cargo.push('baz');
});

it('running', function(done) {
var cargo = async.cargo(function(tasks, cb) {
expect(cargo.running()).to.equal(1);
async.setImmediate(function() {
expect(cargo.running()).to.equal(1);
cb();
});
}, 2);

cargo.drain = function() {
expect(cargo.running()).to.equal(0);
done();
};

cargo.push('foo');
cargo.push('bar');
cargo.push('baz');
})
});
65 changes: 63 additions & 2 deletions mocha_test/queue.js
Expand Up @@ -656,7 +656,7 @@ describe('queue', function(){
});
});

context('q.unsaturated(): ',function() {
context('q.unsaturated(): ', function() {
it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){
var calls = [];
var q = async.queue(function(task, cb) {
Expand Down Expand Up @@ -719,6 +719,68 @@ describe('queue', function(){
});
});

context('workersList', function() {
it('should be the same length as running()', function(done) {
var q = async.queue(function(task, cb) {
async.setImmediate(function() {
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);

q.drain = function() {
expect(q.workersList().length).to.equal(0);
expect(q.running()).to.equal(0);
done();
};

q.push('foo');
q.push('bar');
q.push('baz');
});

it('should contain the items being processed', function(done) {
var itemsBeingProcessed = {
'foo': ['foo'],
'foo_cb': ['foo', 'bar'],
'bar': ['foo', 'bar'],
'bar_cb': ['bar', 'baz'],
'baz': ['bar', 'baz'],
'baz_cb': ['baz']
};

function getWorkersListData(q) {
return q.workersList().map(function(v) {
return v.data;
});
}

var q = async.queue(function(task, cb) {
expect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert that workersList().length === running() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks!

getWorkersListData(q)
).to.eql(itemsBeingProcessed[task]);
expect(q.workersList().length).to.equal(q.running());
async.setImmediate(function() {
expect(
getWorkersListData(q)
).to.eql(itemsBeingProcessed[task+'_cb']);
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);

q.drain = function() {
expect(q.workersList()).to.eql([]);
expect(q.workersList().length).to.equal(q.running());
done();
};

q.push('foo');
q.push('bar');
q.push('baz');
});
})

it('remove', function(done) {
var result = [];
var q = async.queue(function(data, cb) {
Expand All @@ -738,4 +800,3 @@ describe('queue', function(){
}
});
});