Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update priorityQueue functionality to match queue [fixes #1725] #1790

Merged
merged 1 commit into from Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/internal/queue.js
Expand Up @@ -60,12 +60,11 @@ export default function queue(worker, concurrency, payload) {
res(args)
}

var item = {
var item = q._createTaskItem(
data,
callback: rejectOnError ?
promiseCallback :
rejectOnError ? promiseCallback :
(callback || promiseCallback)
};
);

if (insertAtFront) {
q._tasks.unshift(item);
Expand Down Expand Up @@ -147,6 +146,12 @@ export default function queue(worker, concurrency, payload) {
var isProcessing = false;
var q = {
_tasks: new DLL(),
_createTaskItem (data, callback) {
return {
data,
callback
};
},
*[Symbol.iterator] () {
yield* q._tasks[Symbol.iterator]()
},
Expand Down
62 changes: 29 additions & 33 deletions lib/priorityQueue.js
@@ -1,4 +1,3 @@
import setImmediate from './setImmediate.js'
import queue from './queue.js'
import Heap from './internal/Heap.js'

Expand All @@ -19,54 +18,51 @@ import Heap from './internal/Heap.js'
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
* * The `unshift` method was removed.
* * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`,
* except this returns a promise that rejects if an error occurs.
* * The `unshift` and `unshiftAsync` methods were removed.
*/
export default function(worker, concurrency) {
// Start with a normal queue
var q = queue(worker, concurrency);
var processingScheduled = false;

var {
push,
pushAsync
} = q;

q._tasks = new Heap();
q._createTaskItem = ({data, priority}, callback) => {
return {
data,
priority,
callback
};
};

// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback = () => {}) {
if (typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
function createDataItems(tasks, priority) {
if (!Array.isArray(tasks)) {
return {data: tasks, priority};
}
return tasks.map(data => { return {data, priority}; });
}

for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
priority,
callback
};

q._tasks.push(item);
}
// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback) {
return push(createDataItems(data, priority), callback);
};

if (!processingScheduled) {
processingScheduled = true;
setImmediate(() => {
processingScheduled = false;
q.process();
});
}
q.pushAsync = function(data, priority = 0, callback) {
return pushAsync(createDataItems(data, priority), callback);
};

// Remove unshift function
// Remove unshift functions
delete q.unshift;
delete q.unshiftAsync;

return q;
}
60 changes: 58 additions & 2 deletions test/es2017/awaitableFunctions.js
Expand Up @@ -540,7 +540,7 @@ module.exports = function () {
it('should work with queues', async () => {
const q = async.queue(async (data) => {
if (data === 2) throw new Error('oh noes')
await new Promise(resolve => setTimeout(resolve, 10))
await new Promise(resolve => setTimeout(() => resolve(data), 10))
return data
}, 5)

Expand All @@ -561,7 +561,7 @@ module.exports = function () {
const multiP = Promise.all(q.push([9, 10]))

await q.drain()
await multiP
const res = await multiP
expect(calls.join()).to.eql([
'saturated',
'push cb 1',
Expand All @@ -581,6 +581,62 @@ module.exports = function () {
expect(emptyCalls).to.eql([
'empty',
])

expect(res).to.eql([
9,
10
])
})

it('should work with priorityQueues', async () => {
const q = async.priorityQueue(async (data) => {
if (data === 2) throw new Error('oh noes')
await new Promise(resolve => setTimeout(() => resolve(data), 10))
return data
}, 5)

const calls = []
const errorCalls = []
const emptyCalls = []
q.error().catch(d => errorCalls.push('error ' + d))
q.saturated().then(() => calls.push('saturated'))
q.unsaturated().then(() => calls.push('unsaturated'))
q.empty().then(() => emptyCalls.push('empty'))

q.push(1, 1).then(d => calls.push('push cb ' + d))
q.push(2, 1).then(d => errorCalls.push('push cb ' + d))
q.push([3, 4, 5, 6], 0).map(p => p.then(d => calls.push('push cb ' + d)))
q.push(7, 3).then(d => calls.push('push cb ' + d))
q.push(8, 3).then(d => calls.push('push cb ' + d))

const multiP = Promise.all(q.push([9, 10], 1))

await q.drain()
const res = await multiP
expect(calls.join()).to.eql([
'saturated',
'push cb 3',
'push cb 4',
'push cb 5',
'push cb 6',
'push cb 1',
'unsaturated',
'push cb 7',
'push cb 8'
].join())

expect(errorCalls).to.eql([
'push cb undefined',
'error Error: oh noes',
])
expect(emptyCalls).to.eql([
'empty',
])

expect(res).to.eql([
9,
10
])
})

/*
Expand Down