Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async.queue start up performance #1448

Merged
merged 4 commits into from Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/internal/queue.js
Expand Up @@ -18,6 +18,7 @@ export default function queue(worker, concurrency, payload) {
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
var isWaitingForProcessing = false;

function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
Expand Down Expand Up @@ -46,7 +47,11 @@ export default function queue(worker, concurrency, payload) {
q._tasks.push(item);
}
}
setImmediate(q.process);

if (!isWaitingForProcessing) {
isWaitingForProcessing = true;
setImmediate(q.process);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to implement this would be to add a separate scheduleProcessing function that sets up q.process to run once on the next tick. That way the logic isn't spread across _insert and process

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hargasinski @aearly

if (!isWaitingForProcessing) {
  isWaitingForProcessing = true;
  setImmediate(function () {
    isWaitingForProcessing = false;
    q.process();
  });
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aearly I update the PR with @ex1st's suggestion (thanks @ex1st). The scheduling is now completely contained within the _insert function, so there shouldn't be a way to permanently pause the queue.

One minor change is that if q.process is updated between calling q.push and the next tick, the new q.process will run as opposed to the old one. Previously, the old q.process would run. The new style follows the q.drain style, so it could technically be considered a bug fix. I'm probably just bikeshedding though, as q.process isn't mentioned anywhere in our docs, and there would probably be a lot of other issues if someone reassigns it. However, if we want to keep the previous behaviour, it's a small change:

if (!isWaitingForProcessing) {
  isWaitingForProcessing = true;
  var _process = q.process;
  setImmediate(function () {
    isWaitingForProcessing = false;
    _process();
  });
}

The updated benchmark:

$ ./perf/benchmark.js --grep queue
Latest tag is  v2.5.0
Comparing v2.5.0 with current on Node v7.10.0
--------------------------------------
queue(10) v2.5.0 x 37,864 ops/sec ±1.87% (29 runs sampled), 0.0264ms per run
queue(10) current x 41,483 ops/sec ±0.71% (31 runs sampled), 0.0241ms per run
current is faster
--------------------------------------
queue(100) v2.5.0 x 5,647 ops/sec ±1.57% (26 runs sampled), 0.177ms per run
queue(100) current x 6,111 ops/sec ±0.77% (32 runs sampled), 0.164ms per run
current is faster
--------------------------------------
queue(1000) v2.5.0 x 616 ops/sec ±1.71% (32 runs sampled), 1.62ms per run
queue(1000) current x 685 ops/sec ±0.57% (33 runs sampled), 1.46ms per run
current is faster
--------------------------------------
queue(30000) v2.5.0 x 17.56 ops/sec ±1.73% (31 runs sampled), 56.9ms per run
queue(30000) current x 20.09 ops/sec ±2.08% (20 runs sampled), 49.8ms per run
current is faster
--------------------------------------
queue(100000) v2.5.0 x 4.18 ops/sec ±5.79% (9 runs sampled), 240ms per run
queue(100000) current x 5.98 ops/sec ±2.86% (12 runs sampled), 167ms per run
current is faster
--------------------------------------
queue(200000) v2.5.0 x 1.96 ops/sec ±15.76% (4 runs sampled), 511ms per run
queue(200000) current x 2.99 ops/sec ±4.47% (6 runs sampled), 335ms per run
current is faster
--------------------------------------
current faster overall (554ms total vs. 810ms total)
current won more benchmarks (6 vs. 0)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry about the q.process change. It's a footgun if a user messes with it.

}

function _next(tasks) {
Expand Down Expand Up @@ -106,6 +111,8 @@ export default function queue(worker, concurrency, payload) {
q._tasks.remove(testFn);
},
process: function () {
isWaitingForProcessing = false;

// Avoid trying to start too many processing operations. This can occur
// when callbacks resolve synchronously (#1267).
if (isProcessing) {
Expand Down
2 changes: 2 additions & 0 deletions perf/suites.js
Expand Up @@ -276,6 +276,8 @@ module.exports = [{
}, {
name: "queue",
args: [
[10],
[100],
[1000],
[30000],
[100000],
Expand Down