Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await-able queues #1586

Closed
aearly opened this issue Oct 1, 2018 · 2 comments · Fixed by #1641
Closed

Await-able queues #1586

aearly opened this issue Oct 1, 2018 · 2 comments · Fixed by #1641

Comments

@aearly
Copy link
Collaborator

aearly commented Oct 1, 2018

I've been thinking about how we might make queues work better with async/await. It would be nice if certain operations returned promises, so they could be awaitable. However, most of the callbacks used in queues are really more event-based, rather than a single task that resolves once.

For example, q.push(task) also accepts a callback. We could make push() return a promise if you leave the callback off. However, push() also accepts an array of items to push, in which case, the callback is called multiple times. Promises can't resolve multiple times.

const done = await q.push([foo, bar, baz]) // how should this resolve?

We could make it so it only resolves when all the tasks complete, but this complicates internal logic.

The handling of errors in this case would also be a bit strange. The callback for push() is mainly useful for determining if an error occurred processing the item. If we return a Promise, suddenly that error becomes an unhandled rejection. If we return a promise from push() suddenly a lot of existing code starts throwing unhandled rejections, as most people dont use the push() callback. We could change it so the promise resolves with the error object in those cases.

There are several event callbacks you can register, empty, saturated, error etc. It seems really difficult to make these awaitable in a useful way. I've thought about changing the QueueObject API so that event handlers are methods, rather than bare properties.

q.error((err, task) => console.log(`Queue error: ${err.message} for ${task}`))

//rather than

q.error = (err, task) => console.log(`Queue error: ${err.message} for ${task}`)

In this case, we could make it so calling the method without passing a function returns a promise that resolves the next time the event triggers.

const [err, task] = await q.error()

This is a bit clunky. To repeatedly await errors, you'd have to use an infinite loop:

var q = async.queue(taskFn)

const errorLoop = async () => {
  while(q.length) { // not a good condition to test for
    const [err, task] = await q.error()  // if no error occurs, this waits here forever
    console.log(err)
    q.push(task)
  }
}
errorLoop() // fire and forget, hopefully it exits when the queue drains.

It's also a bit complicated to implement internally. We have to keep track of the promises for each event, creating promise references for each and tracking them appropriately.

Returning a promise on the next firing of the event would be the most useful for the drain callback. It makes the most common use of the queue work nicely in the context of an async function:

async processItems(items) {
  const q = async.queue(processAsync, 5)
  q.push(items)
  await q.drain()
}

These changes add slightly more utility to the queue in async/await but might be more trouble than they're worth.


Another idea for the event-style callbacks is to return an Async Iterator:

const iterator = q.error()

for await ([err, item] of iterator) {
  //...
}

These would still have to be wrapped in a fire-and-forget async function, unless you used one of the for await loops to drive the lifecycle of the queue within the main async function. I think we would also have to assume that a drain event means the queue has ended, thus also ending all the async iterators.

Lots of implementation complexity here as well.

@aearly aearly added this to the 3.0 milestone Oct 1, 2018
@aearly
Copy link
Collaborator Author

aearly commented Mar 13, 2019

Node is considering making eventEmitter.once() return a promise, so you can do things like:

const data = await emitter.once('data')

This is similar to the first idea above, having the event-style methods return a promise for the next occurrence of the event.

@Morikko
Copy link

Morikko commented Mar 20, 2019

For the push API

You could return an array of promises:

const all_tasks_promise = await q.push([foo, bar, baz])

await Promise.all(all_tasks_promise) // Wait all

const [, task_2_promise] = all_tasks_promise

try {
  await task_2_promise // wait only one task
} catch (err) {
  // Do a specific error handling
}

Add an automatic error handler for each promise in the queue:

// In the queue
task_promise.catch(() => {}) // avoid an unhandled rejection

// For the user
// optional try/catch
try {
  await q.push(task)
} catch(err) {
  // Do a specific error handling
}

For the event based API

I definitely prefer the method style because you can pass more than one function at different time. It is more an observer pattern than just a single hard coded callback.

  1. Using a method for adding observers is good => event programming
  2. Adding an API for a one shot call to get a promise that resolves the next time something happened is useful especially for the empty and drain cases (like you showed)

However the API should differ in a more precise way than just omitting the callback. I think so because the behavior is not the same. In case 1, we do a recursive calling for each event. In case 2, only the next event is targeted.

Maybe:

  1. Observers
q.drain(() => console.log('Queue drained'))
q.empty(() => console.log('Queue empty'))
q.error(() => console.log('Queue error'))
  1. Next event
q.nextDrain() // return a promise
q.nextDrain(() => console.log('Queue drained')) // only the next event but with the callback style

// and so on...
q.nextEmpty()
q.nextError()

Adding an API to get an async generator sounds a bit overkill and complicated compared to use the event programming (doing a recursive task each time something happened).
But, if someone comes with a real use case, the iterator would make sense. In my opinion, it should be treated in another issue later on.

Repository owner deleted a comment from ferdian26 May 6, 2019
Repository owner deleted a comment from ferdian26 May 6, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants