Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implment queues using DLLs #1205

Merged
merged 2 commits into from Jul 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/auto.js
Expand Up @@ -4,9 +4,9 @@ import indexOf from 'lodash/_baseIndexOf';
import isArray from 'lodash/isArray';
import okeys from 'lodash/keys';
import noop from 'lodash/noop';
import once from './internal/once';
import rest from 'lodash/rest';

import once from './internal/once';
import onlyOnce from './internal/onlyOnce';

/**
Expand Down
62 changes: 62 additions & 0 deletions lib/internal/DoublyLinkedList.js
@@ -0,0 +1,62 @@
// Simple doubly linked list (https://en.wikipedia.org/wiki/Doubly_linked_list) implementation
// used for queues. This implementation assumes that the node provided by the user can be modified
// to adjust the next and last properties. We implement only the minimal functionality
// for queue support.
export default function DLL() {
this.head = this.tail = null;
this.length = 0;
}

function setInitial(dll, node) {
dll.length = 1;
dll.head = dll.tail = node;
}

DLL.prototype.removeLink = function(node) {
if (node.prev) node.prev.next = node.next;
else this.head = node.next
if (node.next) node.next.prev = node.prev;
else this.tail = node.prev;

node.prev = node.next = null;
this.length -= 1;
return node;
}

DLL.prototype.empty = DLL;

DLL.prototype.insertAfter = function(node, newNode) {
newNode.prev = node;
newNode.next = node.next;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this add prev and next properties to items people push to queues? Perhaps the list should store {prev, next, data} objects, where data points to the original object.

Copy link
Collaborator Author

@megawac megawac Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, push wraps the item in an internal object already, {data, callback} for queue and {data, callback, priority} for priority queue

if (node.next) node.next.prev = newNode;
else this.tail = newNode;
node.next = newNode;
this.length += 1;
}

DLL.prototype.insertBefore = function(node, newNode) {
newNode.prev = node.prev;
newNode.next = node;
if (node.prev) node.prev.next = newNode;
else this.head = newNode;
node.prev = newNode;
this.length += 1;
}

DLL.prototype.unshift = function(node) {
if (this.head) this.insertBefore(this.head, node);
else setInitial(this, node);
};

DLL.prototype.push = function(node) {
if (this.tail) this.insertAfter(this.tail, node);
else setInitial(this, node);
};

DLL.prototype.shift = function() {
return this.head && this.removeLink(this.head);
};

DLL.prototype.pop = function() {
return this.tail && this.removeLink(this.tail);
};
52 changes: 26 additions & 26 deletions lib/internal/queue.js
@@ -1,11 +1,10 @@
import arrayEach from 'lodash/_arrayEach';
import arrayMap from 'lodash/_arrayMap';
import isArray from 'lodash/isArray';
import noop from 'lodash/noop';
import property from 'lodash/_baseProperty';

import onlyOnce from './onlyOnce';
import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';

export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
Expand All @@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) {
else if(concurrency === 0) {
throw new Error('Concurrency must not be zero');
}
function _insert(q, data, pos, callback) {

function _insert(data, pos, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
Expand All @@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) {
};

if (pos) {
q.tasks.unshift(item);
q._tasks.unshift(item);
} else {
q.tasks.push(item);
q._tasks.push(item);
}

});
setImmediate(q.process);
}
function _next(q, tasks) {

function _next(tasks) {
return function(){
workers -= 1;


var removed = false;
var args = arguments;
arrayEach(tasks, function (task) {
Expand All @@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) {
q.unsaturated();
}

if (q.tasks.length + workers === 0) {
if (q._tasks.length + workers === 0) {
q.drain();
}
q.process();
Expand All @@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) {
var workers = 0;
var workersList = [];
var q = {
tasks: [],
_tasks: new DLL(),
concurrency: concurrency,
payload: payload,
saturated: noop,
Expand All @@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) {
started: false,
paused: false,
push: function (data, callback) {
_insert(q, data, false, callback);
_insert(data, false, callback);
},
kill: function () {
q.drain = noop;
q.tasks = [];
q._tasks.empty();
},
unshift: function (data, callback) {
_insert(q, data, true, callback);
_insert(data, true, callback);
},
process: function () {
while(!q.paused && workers < q.concurrency && q.tasks.length){

var tasks = q.payload ?
q.tasks.splice(0, q.payload) :
q.tasks.splice(0, q.tasks.length);

var data = arrayMap(tasks, property('data'));
while(!q.paused && workers < q.concurrency && q._tasks.length){
var tasks = [], data = [];
var l = q._tasks.length;
if (q.payload) l = Math.min(l, q.payload);
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
data.push(node.data);
}

if (q.tasks.length === 0) {
if (q._tasks.length === 0) {
q.empty();
}
workers += 1;
Expand All @@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) {
q.saturated();
}

var cb = onlyOnce(_next(q, tasks));
var cb = onlyOnce(_next(tasks));
worker(data, cb);


}
},
length: function () {
return q.tasks.length;
return q._tasks.length;
},
running: function () {
return workers;
Expand All @@ -135,15 +135,15 @@ export default function queue(worker, concurrency, payload) {
return workersList;
},
idle: function() {
return q.tasks.length + workers === 0;
return q._tasks.length + workers === 0;
},
pause: function () {
q.paused = true;
},
resume: function () {
if (q.paused === false) { return; }
q.paused = false;
var resumeCount = Math.min(q.concurrency, q.tasks.length);
var resumeCount = Math.min(q.concurrency, q._tasks.length);
// Need to call q.process once per concurrent
// worker to preserve full concurrency after pause
for (var w = 1; w <= resumeCount; w++) {
Expand Down
45 changes: 16 additions & 29 deletions lib/priorityQueue.js
Expand Up @@ -30,25 +30,11 @@ import queue from './queue';
* * The `unshift` method was removed.
*/
export default function(worker, concurrency) {
function _compareTasks(a, b) {
return a.priority - b.priority;
}

function _binarySearch(sequence, item, compare) {
var beg = -1,
end = sequence.length - 1;
while (beg < end) {
var mid = beg + ((end - beg + 1) >>> 1);
if (compare(item, sequence[mid]) >= 0) {
beg = mid;
} else {
end = mid - 1;
}
}
return beg;
}
// Start with a normal queue
var q = queue(worker, concurrency);

function _insert(q, data, priority, callback) {
// Override push to accept second parameter representing priority
q.push = function(data, priority, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
Expand All @@ -62,25 +48,26 @@ export default function(worker, concurrency) {
q.drain();
});
}

var nextNode = q._tasks.head;
while (nextNode && priority >= nextNode.priority) {
nextNode = nextNode.next;
}

arrayEach(data, function(task) {
var item = {
data: task,
priority: priority,
callback: typeof callback === 'function' ? callback : noop
};

q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);

setImmediate(q.process);
if (nextNode) {
q._tasks.insertBefore(nextNode, item);
} else {
q._tasks.push(item);
}
});
}

// Start with a normal queue
var q = queue(worker, concurrency);

// Override push to accept second parameter representing priority
q.push = function(data, priority, callback) {
_insert(q, data, priority, callback);
setImmediate(q.process);
};

// Remove unshift function
Expand Down
2 changes: 1 addition & 1 deletion mocha_test/queue.js
Expand Up @@ -496,7 +496,7 @@ describe('queue', function(){
}, 5);

setTimeout(function () {
expect(q.tasks.length).to.equal(1);
expect(q._tasks.length).to.equal(1);
expect(q.running()).to.equal(2);
q.resume();
}, 15);
Expand Down