Skip to content

Commit

Permalink
BREAKING CHANGE: awaitable queues (#1641)
Browse files Browse the repository at this point in the history
* BREAKING CHANGE: awaitable queues

* fix priorityQueue tests

* fix tests in firefox

* make the upgrade a bit more user-friendly

* clarify docs
  • Loading branch information
aearly committed May 20, 2019
1 parent 1d458d9 commit e044664
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 211 deletions.
37 changes: 3 additions & 34 deletions lib/cargo.js
@@ -1,35 +1,5 @@
import queue from './internal/queue';

/**
* A cargo of tasks for the worker function to complete. Cargo inherits all of
* the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}.
* @typedef {Object} CargoObject
* @memberOf module:ControlFlow
* @property {Function} length - A function returning the number of items
* waiting to be processed. Invoke like `cargo.length()`.
* @property {number} payload - An `integer` for determining how many tasks
* should be process per round. This property can be changed after a `cargo` is
* created to alter the payload on-the-fly.
* @property {Function} push - Adds `task` to the `queue`. The callback is
* called once the `worker` has finished processing the task. Instead of a
* single task, an array of `tasks` can be submitted. The respective callback is
* used for every task in the list. Invoke like `cargo.push(task, [callback])`.
* @property {Function} saturated - A callback that is called when the
* `queue.length()` hits the concurrency and further tasks will be queued.
* @property {Function} empty - A callback that is called when the last item
* from the `queue` is given to a `worker`.
* @property {Function} drain - A callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} idle - a function returning false if there are items
* waiting or being processed, or true if not. Invoke like `cargo.idle()`.
* @property {Function} pause - a function that pauses the processing of tasks
* until `resume()` is called. Invoke like `cargo.pause()`.
* @property {Function} resume - a function that resumes the processing of
* queued tasks when the queue is paused. Invoke like `cargo.resume()`.
* @property {Function} kill - a function that removes the `drain` callback and
* empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`.
*/

/**
* Creates a `cargo` object with the specified payload. Tasks added to the
* cargo will be processed altogether (up to the `payload` limit). If the
Expand All @@ -53,7 +23,7 @@ import queue from './internal/queue';
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
* @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can
* @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargo and inner queue.
* @example
Expand All @@ -73,9 +43,8 @@ import queue from './internal/queue';
* cargo.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
* cargo.push({name: 'baz'}, function(err) {
* console.log('finished processing baz');
* });
* await cargo.push({name: 'baz'});
* console.log('finished processing baz');
*/
export default function cargo(worker, payload) {
return queue(worker, 1, payload);
Expand Down
136 changes: 104 additions & 32 deletions lib/internal/queue.js
Expand Up @@ -3,8 +3,6 @@ import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
import wrapAsync from './wrapAsync';

const noop = () => {}

export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
Expand All @@ -16,32 +14,68 @@ export default function queue(worker, concurrency, payload) {
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
const events = {
error: [],
drain: [],
saturated: [],
unsaturated: [],
empty: []
}

function on (event, handler) {
events[event].push(handler)
}

function once (event, handler) {
const handleAndRemove = (...args) => {
off(event, handleAndRemove)
handler(...args)
}
events[event].push(handleAndRemove)
}

function off (event, handler) {
if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
if (!handler) return events[event] = []
events[event] = events[event].filter(ev => ev !== handler)
}

function trigger (event, ...args) {
events[event].forEach(handler => handler(...args))
}

var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
if (Array.isArray(data)) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => trigger('drain'));
}

return data.map(datum => _insert(datum, insertAtFront, callback));
}

for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
callback: callback || noop
};
var res;

if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
var item = {
data,
callback: callback || function (err, ...args) {
// we don't care about the error, let the global error handler
// deal with it
if (err) return
if (args.length <= 1) return res(args[0])
res(args)
}
};

if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
}

if (!processingScheduled) {
Expand All @@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) {
q.process();
});
}

if (!callback) {
return new Promise((resolve) => {
res = resolve
})
}
}

function _next(tasks) {
function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;

Expand All @@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) {
task.callback(err, ...args);

if (err != null) {
q.error(err, task.data);
trigger('error', err, task.data);
}
}

if (numRunning <= (q.concurrency - q.buffer) ) {
q.unsaturated();
trigger('unsaturated')
}

if (q.idle()) {
q.drain();
trigger('drain')
}
q.process();
};
}

const eventMethod = (name) => (handler) => {
if (!handler) {
return new Promise((resolve, reject) => {
once(name, (err, data) => {
if (err) return reject(err)
resolve(data)
})
})
}
off(name)
on(name, handler)

}

var isProcessing = false;
var q = {
_tasks: new DLL(),
Expand All @@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) {
},
concurrency,
payload,
saturated: noop,
unsaturated:noop,
buffer: concurrency / 4,
empty: noop,
drain: noop,
error: noop,
started: false,
paused: false,
push (data, callback) {
_insert(data, false, callback);
return _insert(data, false, callback);
},
kill () {
q.drain = noop;
off()
q._tasks.empty();
},
unshift (data, callback) {
_insert(data, true, callback);
return _insert(data, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
Expand All @@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) {
numRunning += 1;

if (q._tasks.length === 0) {
q.empty();
trigger('empty');
}

if (numRunning === q.concurrency) {
q.saturated();
trigger('saturated');
}

var cb = onlyOnce(_next(tasks));
var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
Expand All @@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) {
setImmediate(q.process);
}
};
// define these as fixed properties, so people get useful errors when updating
Object.defineProperties(q, {
saturated: {
writable: false,
value: eventMethod('saturated')
},
unsaturated: {
writable: false,
value: eventMethod('unsaturated')
},
empty: {
writable: false,
value: eventMethod('empty')
},
drain: {
writable: false,
value: eventMethod('drain')
},
error: {
writable: false,
value: eventMethod('error')
},
})
return q;
}
54 changes: 35 additions & 19 deletions lib/queue.js
Expand Up @@ -18,6 +18,9 @@ import wrapAsync from './internal/wrapAsync';
* @property {number} concurrency - an integer for determining how many `worker`
* functions should be run in parallel. This property can be changed after a
* `queue` is created to alter the concurrency on-the-fly.
* @property {number} payload - an integer that specifies how many items are
* passed to the worker function at a time. only applies if this is a
* [cargo]{@link module:ControlFlow.cargo} object
* @property {Function} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
Expand All @@ -30,20 +33,26 @@ import wrapAsync from './internal/wrapAsync';
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
* `function ({data, priority}) {}` and returns a Boolean.
* @property {Function} saturated - a callback that is called when the number of
* running workers hits the `concurrency` limit, and further tasks will be
* queued.
* @property {Function} unsaturated - a callback that is called when the number
* of running workers is less than the `concurrency` & `buffer` limits, and
* further tasks will not be queued.
* @property {Function} saturated - a function that sets a callback that is
* called when the number of running workers hits the `concurrency` limit, and
* further tasks will be queued. If the callback is omitted, `q.saturated()`
* returns a promise for the next occurrence.
* @property {Function} unsaturated - a function that sets a callback that is
* called when the number of running workers is less than the `concurrency` &
* `buffer` limits, and further tasks will not be queued. If the callback is
* omitted, `q.unsaturated()` returns a promise for the next occurrence.
* @property {number} buffer - A minimum threshold buffer in order to say that
* the `queue` is `unsaturated`.
* @property {Function} empty - a callback that is called when the last item
* from the `queue` is given to a `worker`.
* @property {Function} drain - a callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} error - a callback that is called when a task errors.
* Has the signature `function(error, task)`.
* @property {Function} empty - a function that sets a callback that is called
* when the last item from the `queue` is given to a `worker`. If the callback
* is omitted, `q.empty()` returns a promise for the next occurrence.
* @property {Function} drain - a function that sets a callback that is called
* when the last item from the `queue` has returned from the `worker`. If the
* callback is omitted, `q.drain()` returns a promise for the next occurrence.
* @property {Function} error - a function that sets a callback that is called
* when a task errors. Has the signature `function(error, task)`. If the
* callback is omitted, `error()` returns a promise that rejects on the next
* error.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
Expand All @@ -65,6 +74,12 @@ import wrapAsync from './internal/wrapAsync';
* for (let item of q) {
* console.log(item)
* }
*
* q.drain(() => {
* console.log('all done')
* })
* // or
* await q.drain()
*/

/**
Expand Down Expand Up @@ -96,22 +111,23 @@ import wrapAsync from './internal/wrapAsync';
* }, 2);
*
* // assign a callback
* q.drain = function() {
* q.drain(function() {
* console.log('all items have been processed');
* };
* });
* // or await the end
* await q.drain()
*
* // assign an error callback
* q.error = function(err, task) {
* q.error(function(err, task) {
* console.error('task experienced an error');
* };
* });
*
* // add some items to the queue
* q.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
* q.push({name: 'bar'}, function (err) {
* console.log('finished processing bar');
* });
* // callback is optional
* q.push({name: 'bar'});
*
* // add some items to the queue (batch-wise)
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
Expand Down

0 comments on commit e044664

Please sign in to comment.