Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING CHANGE: awaitable queues #1641

Merged
merged 6 commits into from May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 3 additions & 34 deletions lib/cargo.js
@@ -1,35 +1,5 @@
import queue from './internal/queue';

/**
* A cargo of tasks for the worker function to complete. Cargo inherits all of
* the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}.
* @typedef {Object} CargoObject
* @memberOf module:ControlFlow
* @property {Function} length - A function returning the number of items
* waiting to be processed. Invoke like `cargo.length()`.
* @property {number} payload - An `integer` for determining how many tasks
* should be process per round. This property can be changed after a `cargo` is
* created to alter the payload on-the-fly.
* @property {Function} push - Adds `task` to the `queue`. The callback is
* called once the `worker` has finished processing the task. Instead of a
* single task, an array of `tasks` can be submitted. The respective callback is
* used for every task in the list. Invoke like `cargo.push(task, [callback])`.
* @property {Function} saturated - A callback that is called when the
* `queue.length()` hits the concurrency and further tasks will be queued.
* @property {Function} empty - A callback that is called when the last item
* from the `queue` is given to a `worker`.
* @property {Function} drain - A callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} idle - a function returning false if there are items
* waiting or being processed, or true if not. Invoke like `cargo.idle()`.
* @property {Function} pause - a function that pauses the processing of tasks
* until `resume()` is called. Invoke like `cargo.pause()`.
* @property {Function} resume - a function that resumes the processing of
* queued tasks when the queue is paused. Invoke like `cargo.resume()`.
* @property {Function} kill - a function that removes the `drain` callback and
* empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`.
*/

/**
* Creates a `cargo` object with the specified payload. Tasks added to the
* cargo will be processed altogether (up to the `payload` limit). If the
Expand All @@ -53,7 +23,7 @@ import queue from './internal/queue';
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
* @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can
* @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargo and inner queue.
* @example
Expand All @@ -73,9 +43,8 @@ import queue from './internal/queue';
* cargo.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
* cargo.push({name: 'baz'}, function(err) {
* console.log('finished processing baz');
* });
* await cargo.push({name: 'baz'});
* console.log('finished processing baz');
*/
export default function cargo(worker, payload) {
return queue(worker, 1, payload);
Expand Down
136 changes: 104 additions & 32 deletions lib/internal/queue.js
Expand Up @@ -3,8 +3,6 @@ import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
import wrapAsync from './wrapAsync';

const noop = () => {}

export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
Expand All @@ -16,32 +14,68 @@ export default function queue(worker, concurrency, payload) {
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
const events = {
error: [],
drain: [],
saturated: [],
unsaturated: [],
empty: []
}

function on (event, handler) {
events[event].push(handler)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite an interesting way of handling both promises and the current API. Is the promise based API useful for any of the events besides drain?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's most useful for drain, but i can see utility coming from unsaturated, and making error reject. I'm also happy making the interface uniform for all the event-style methods.

}

function once (event, handler) {
const handleAndRemove = (...args) => {
off(event, handleAndRemove)
handler(...args)
}
events[event].push(handleAndRemove)
}

function off (event, handler) {
if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
if (!handler) return events[event] = []
events[event] = events[event].filter(ev => ev !== handler)
}

function trigger (event, ...args) {
events[event].forEach(handler => handler(...args))
}

var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
if (Array.isArray(data)) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => trigger('drain'));
}

return data.map(datum => _insert(datum, insertAtFront, callback));
}

for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
callback: callback || noop
};
var res;

if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
var item = {
data,
callback: callback || function (err, ...args) {
// we don't care about the error, let the global error handler
// deal with it
if (err) return
if (args.length <= 1) return res(args[0])
res(args)
}
};

if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
}

if (!processingScheduled) {
Expand All @@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) {
q.process();
});
}

if (!callback) {
return new Promise((resolve) => {
res = resolve
})
}
}

function _next(tasks) {
function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;

Expand All @@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) {
task.callback(err, ...args);

if (err != null) {
q.error(err, task.data);
trigger('error', err, task.data);
}
}

if (numRunning <= (q.concurrency - q.buffer) ) {
q.unsaturated();
trigger('unsaturated')
}

if (q.idle()) {
q.drain();
trigger('drain')
}
q.process();
};
}

const eventMethod = (name) => (handler) => {
if (!handler) {
return new Promise((resolve, reject) => {
once(name, (err, data) => {
if (err) return reject(err)
resolve(data)
})
})
}
off(name)
on(name, handler)

}

var isProcessing = false;
var q = {
_tasks: new DLL(),
Expand All @@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) {
},
concurrency,
payload,
saturated: noop,
unsaturated:noop,
buffer: concurrency / 4,
empty: noop,
drain: noop,
error: noop,
started: false,
paused: false,
push (data, callback) {
_insert(data, false, callback);
return _insert(data, false, callback);
},
kill () {
q.drain = noop;
off()
q._tasks.empty();
},
unshift (data, callback) {
_insert(data, true, callback);
return _insert(data, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
Expand All @@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) {
numRunning += 1;

if (q._tasks.length === 0) {
q.empty();
trigger('empty');
}

if (numRunning === q.concurrency) {
q.saturated();
trigger('saturated');
}

var cb = onlyOnce(_next(tasks));
var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
Expand All @@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) {
setImmediate(q.process);
}
};
// define these as fixed properties, so people get useful errors when updating
Object.defineProperties(q, {
saturated: {
writable: false,
value: eventMethod('saturated')
},
unsaturated: {
writable: false,
value: eventMethod('unsaturated')
},
empty: {
writable: false,
value: eventMethod('empty')
},
drain: {
writable: false,
value: eventMethod('drain')
},
error: {
writable: false,
value: eventMethod('error')
},
})
return q;
}
54 changes: 35 additions & 19 deletions lib/queue.js
Expand Up @@ -18,6 +18,9 @@ import wrapAsync from './internal/wrapAsync';
* @property {number} concurrency - an integer for determining how many `worker`
* functions should be run in parallel. This property can be changed after a
* `queue` is created to alter the concurrency on-the-fly.
* @property {number} payload - an integer that specifies how many items are
* passed to the worker function at a time. only applies if this is a
* [cargo]{@link module:ControlFlow.cargo} object
* @property {Function} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
Expand All @@ -30,20 +33,26 @@ import wrapAsync from './internal/wrapAsync';
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
* `function ({data, priority}) {}` and returns a Boolean.
* @property {Function} saturated - a callback that is called when the number of
* running workers hits the `concurrency` limit, and further tasks will be
* queued.
* @property {Function} unsaturated - a callback that is called when the number
* of running workers is less than the `concurrency` & `buffer` limits, and
* further tasks will not be queued.
* @property {Function} saturated - a function that sets a callback that is
* called when the number of running workers hits the `concurrency` limit, and
* further tasks will be queued. If the callback is omitted, `q.saturated()`
* returns a promise for the next occurrence.
* @property {Function} unsaturated - a function that sets a callback that is
* called when the number of running workers is less than the `concurrency` &
* `buffer` limits, and further tasks will not be queued. If the callback is
* omitted, `q.unsaturated()` returns a promise for the next occurrence.
* @property {number} buffer - A minimum threshold buffer in order to say that
* the `queue` is `unsaturated`.
* @property {Function} empty - a callback that is called when the last item
* from the `queue` is given to a `worker`.
* @property {Function} drain - a callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} error - a callback that is called when a task errors.
* Has the signature `function(error, task)`.
* @property {Function} empty - a function that sets a callback that is called
* when the last item from the `queue` is given to a `worker`. If the callback
* is omitted, `q.empty()` returns a promise for the next occurrence.
* @property {Function} drain - a function that sets a callback that is called
* when the last item from the `queue` has returned from the `worker`. If the
* callback is omitted, `q.drain()` returns a promise for the next occurrence.
* @property {Function} error - a function that sets a callback that is called
* when a task errors. Has the signature `function(error, task)`. If the
* callback is omitted, `error()` returns a promise that rejects on the next
* error.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
Expand All @@ -65,6 +74,12 @@ import wrapAsync from './internal/wrapAsync';
* for (let item of q) {
* console.log(item)
* }
*
* q.drain(() => {
* console.log('all done')
* })
* // or
* await q.drain()
*/

/**
Expand Down Expand Up @@ -96,22 +111,23 @@ import wrapAsync from './internal/wrapAsync';
* }, 2);
*
* // assign a callback
* q.drain = function() {
* q.drain(function() {
* console.log('all items have been processed');
* };
* });
* // or await the end
* await q.drain()
*
* // assign an error callback
* q.error = function(err, task) {
* q.error(function(err, task) {
* console.error('task experienced an error');
* };
* });
*
* // add some items to the queue
* q.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
* q.push({name: 'bar'}, function (err) {
* console.log('finished processing bar');
* });
* // callback is optional
* q.push({name: 'bar'});
*
* // add some items to the queue (batch-wise)
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
Expand Down