Skip to content

Commit

Permalink
fix: update priorityQueue functionality to match queue (#1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
hargasinski committed Apr 15, 2022
1 parent 576ba74 commit 6927a81
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 50 deletions.
13 changes: 9 additions & 4 deletions lib/internal/queue.js
Expand Up @@ -60,12 +60,11 @@ export default function queue(worker, concurrency, payload) {
res(args)
}

var item = {
var item = q._createTaskItem(
data,
callback: rejectOnError ?
promiseCallback :
rejectOnError ? promiseCallback :
(callback || promiseCallback)
};
);

if (insertAtFront) {
q._tasks.unshift(item);
Expand Down Expand Up @@ -147,6 +146,12 @@ export default function queue(worker, concurrency, payload) {
var isProcessing = false;
var q = {
_tasks: new DLL(),
_createTaskItem (data, callback) {
return {
data,
callback
};
},
*[Symbol.iterator] () {
yield* q._tasks[Symbol.iterator]()
},
Expand Down
62 changes: 29 additions & 33 deletions lib/priorityQueue.js
@@ -1,4 +1,3 @@
import setImmediate from './setImmediate.js'
import queue from './queue.js'
import Heap from './internal/Heap.js'

Expand All @@ -19,54 +18,51 @@ import Heap from './internal/Heap.js'
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
* * The `unshift` method was removed.
* * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`,
* except this returns a promise that rejects if an error occurs.
* * The `unshift` and `unshiftAsync` methods were removed.
*/
export default function(worker, concurrency) {
// Start with a normal queue
var q = queue(worker, concurrency);
var processingScheduled = false;

var {
push,
pushAsync
} = q;

q._tasks = new Heap();
q._createTaskItem = ({data, priority}, callback) => {
return {
data,
priority,
callback
};
};

// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback = () => {}) {
if (typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
if (!Array.isArray(data)) {
data = [data];
}
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
function createDataItems(tasks, priority) {
if (!Array.isArray(tasks)) {
return {data: tasks, priority};
}
return tasks.map(data => { return {data, priority}; });
}

for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
priority,
callback
};

q._tasks.push(item);
}
// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback) {
return push(createDataItems(data, priority), callback);
};

if (!processingScheduled) {
processingScheduled = true;
setImmediate(() => {
processingScheduled = false;
q.process();
});
}
q.pushAsync = function(data, priority = 0, callback) {
return pushAsync(createDataItems(data, priority), callback);
};

// Remove unshift function
// Remove unshift functions
delete q.unshift;
delete q.unshiftAsync;

return q;
}
60 changes: 58 additions & 2 deletions test/es2017/awaitableFunctions.js
Expand Up @@ -540,7 +540,7 @@ module.exports = function () {
it('should work with queues', async () => {
const q = async.queue(async (data) => {
if (data === 2) throw new Error('oh noes')
await new Promise(resolve => setTimeout(resolve, 10))
await new Promise(resolve => setTimeout(() => resolve(data), 10))
return data
}, 5)

Expand All @@ -561,7 +561,7 @@ module.exports = function () {
const multiP = Promise.all(q.push([9, 10]))

await q.drain()
await multiP
const res = await multiP
expect(calls.join()).to.eql([
'saturated',
'push cb 1',
Expand All @@ -581,6 +581,62 @@ module.exports = function () {
expect(emptyCalls).to.eql([
'empty',
])

expect(res).to.eql([
9,
10
])
})

it('should work with priorityQueues', async () => {
const q = async.priorityQueue(async (data) => {
if (data === 2) throw new Error('oh noes')
await new Promise(resolve => setTimeout(() => resolve(data), 10))
return data
}, 5)

const calls = []
const errorCalls = []
const emptyCalls = []
q.error().catch(d => errorCalls.push('error ' + d))
q.saturated().then(() => calls.push('saturated'))
q.unsaturated().then(() => calls.push('unsaturated'))
q.empty().then(() => emptyCalls.push('empty'))

q.push(1, 1).then(d => calls.push('push cb ' + d))
q.push(2, 1).then(d => errorCalls.push('push cb ' + d))
q.push([3, 4, 5, 6], 0).map(p => p.then(d => calls.push('push cb ' + d)))
q.push(7, 3).then(d => calls.push('push cb ' + d))
q.push(8, 3).then(d => calls.push('push cb ' + d))

const multiP = Promise.all(q.push([9, 10], 1))

await q.drain()
const res = await multiP
expect(calls.join()).to.eql([
'saturated',
'push cb 3',
'push cb 4',
'push cb 5',
'push cb 6',
'push cb 1',
'unsaturated',
'push cb 7',
'push cb 8'
].join())

expect(errorCalls).to.eql([
'push cb undefined',
'error Error: oh noes',
])
expect(emptyCalls).to.eql([
'empty',
])

expect(res).to.eql([
9,
10
])
})

/*
Expand Down

0 comments on commit 6927a81

Please sign in to comment.