Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: stabilize JoinSet #4535

Closed
2 tasks done
carllerche opened this issue Feb 24, 2022 · 19 comments · Fixed by #4920
Closed
2 tasks done

task: stabilize JoinSet #4535

carllerche opened this issue Feb 24, 2022 · 19 comments · Fixed by #4920
Assignees
Labels
A-tokio Area: The main tokio crate M-runtime Module: tokio/runtime M-task Module: tokio/task

Comments

@carllerche
Copy link
Member

carllerche commented Feb 24, 2022

This is a meta issue tracking the stabilization of JoinSet. Outstanding issues:

@carllerche carllerche added A-tokio Area: The main tokio crate M-runtime Module: tokio/runtime M-task Module: tokio/task labels Feb 24, 2022
@hawkw
Copy link
Member

hawkw commented Feb 24, 2022

  • How is JoinMap implemented? A layer on top of JoinSet or a completely separate type?

I'm prototyping a JoinMap impl in tokio-util, should be up for review shortly :)

@hawkw hawkw self-assigned this Feb 24, 2022
@hawkw
Copy link
Member

hawkw commented Feb 24, 2022

JoinMap sketch: #4538

@hawkw
Copy link
Member

hawkw commented Mar 2, 2022

One major deficiency of the current JoinSet (and JoinMap as of #4538) APIs is that there isn't any practical way to use them in a polling-based context.

I noticed this while attempting to rewrite tower's ReadyCache type, which currently uses FuturesUnordered, to use JoinMap as an experiment to test out that API. ReadyCache needs a way to poll the set of pending futures in a poll-based context (poll_ready). The current JoinMap and JoinSet APIs don't expose poll_join_one directly, I think because we didn't want to commit to providing that as a public API commitment. Unfortunately --- unlike a lot of other Tokio types --- there isn't really any way to use JoinMap/JoinSet's join_one methods with the ReusableBoxFuture type from tokio-util, either. join_one mutably borrows the JoinSet/JoinMap, so it can't be used with ReusableBoxFuture, because the future's lifetime may outlive that of the JoinSet/JoinMap.

This means that if you want to use JoinSet/JoinMap in a poll function, currently the only way to do so is call join_one and Box::pin the returned future on every poll. This is really not great at all.

Personally I think we should just expose the poll_join_one methods. It seems pretty unlikely that we won't be able to provide them in the future. Alternatively, we could try to provide an API that's ReusableBoxFuture-friendly, but we can't just make a join_one_owned(self: Arc<JoinSet<T>>), because join_one mutates the JoinSet...so I'm not sure if there's a nice way to do that.

@Darksonn
Copy link
Contributor

Darksonn commented Mar 2, 2022

I am in favor of exposing the poll methods, though I will note that you can definitely use it with ReusableBoxFuture. See the implementation of BroadcastStream for how the same problem was solved for another collection.

@hawkw
Copy link
Member

hawkw commented Mar 3, 2022

See the implementation of BroadcastStream for how the same problem was solved for another collection.

Yeah, I considered an approach like that, where the JoinSet/JoinMap is moved in and out of the ReusableBoxFuture. However, this prevents calling other methods (such as cancelling tasks) after polling the JoinSet and getting back a Pending, I think it might be possible to work around that with some additional gymnastics, but it gets ugly pretty fast.

@hawkw
Copy link
Member

hawkw commented Apr 26, 2022

Merged an implementation of a JoinMap type in tokio-util in #4640.

We should probably do a release of both tokio (with the latest JoinSet changes) and tokio-util (with JoinMap) with both flagged as unstable APIs, and then start to think about stability for JoinSet?

@Noah-Kennedy
Copy link
Contributor

@hawkw I'd agree with this

@abonander
Copy link
Contributor

abonander commented May 11, 2022

I would like to see a method on JoinSet for either adding a manually spawned task, or spawning a task with a given name (so it can be identified in tokio-console).

It seems like the easiest way might be to just make JoinSet::insert() public. I don't see any reason why it shouldn't be as, e.g. JoinSet::spawn() just calls self.insert(crate::spawn(task)).

@abonander
Copy link
Contributor

I also want to note that the signature of JoinSet::join_one() makes it annoying and potentially error-prone if you want to use it with tokio::select!{} but ignore the None condition.

For example, say you have a task that manages a queue of downloads; the first solution you might think of could look something like this:

async fn download_manager(
    client: reqwest::Client,
    notify: Arc<Notify>,
    results: broadcast::Sender<DownloadResult>,
    queue_size: usize,
) {
    let mut pending = VecDeque::new();
    let mut transfers = JoinSet::new();

    fill_pending_queue(&mut pending).await;

    loop {
        while transfers.len() < queue_size {
            if let Some(request) = pending.pop_front() {
                let client = client.clone();
                transfers.spawn(async move { do_download(&client, request).await });
            } else {
                break;
            }
        }

        // We want to wait either for a new request to come in, or for a queued download to complete.
        tokio::select! {
            res = transfers.join_one() => match res {
                Ok(Some(result)) => { let _ = results.send(result).await; }
                Ok(None) => (),
                Err(join_err) => { let _ = results.send(DownloadResult::from_join_err(join_err).await; },
            },
            _ = notify.notified() => {
                fill_pending_queue(&mut pending).await;
            }
        };
    }
}

Except, surprise! When transfers is empty, this runs in a hot loop because .join_one() will constantly yield Ok(None).

The fix is to map with Result::transpose() so that you can ignore None in the pattern match:

tokio::select! {
    Some(res) = transfers.join_one().map(Result::transpose) => match res { ... },
    ...
}

Which is pretty simple, but isn't necessarily obvious, and I could see someone spending a long time trying to figure out why their process runs at 100% on one core periodically.

I see the discussion here: #4335 (review) but I feel like a potential performance footgun probably trumps a minor ergonomic improvement.

@Darksonn
Copy link
Contributor

Darksonn commented May 12, 2022

The tokio::select! argument is convincing to me. I am fine with changing it. Regarding making JoinSet::insert public, there are some concerns that we might want to change how the future is allocated when it is inserted into a JoinSet, which we wouldn't be able to do if you can spawn them separately. Adding a function for giving them names seems good to me.

@hawkw
Copy link
Member

hawkw commented May 12, 2022

Regarding making JoinSet::insert public, there are some concerns that we might want to change how the future is allocated when it is inserted into a JoinSet, which we wouldn't be able to do if you can spawn them separately.

Yeah, I agree with @Darksonn on this one...making the insert method public would certainly make the API much simpler, but it's kind of a compatibility risk that could make some changes impossible that the current API permits. Having the JoinSet control spawning means we need a bunch of different spawn methods, which is unfortunate, but it allows the JoinSet the ability to spawn through a mechanism that isn't the same as the public APIs for spawning, in the future.

Adding a function for giving them names seems good to me.

IMO it seems like the best approach for this is to add spawning functions to JoinSet taking a task and a builder, rather than one specifically for spawning a named task. That way, if we add other settings to the builder API in the future, we don't have to add more spawning APIs to JoinSet (it already has a lot).

Edit: Thinking about that a bit more, maybe the best thing to do is add a builder method to "attach" the builder to a JoinSet, and then use the builder's existing spawn methods --- if we had an API like this

let mut joinset = JoinSet::new();

tokio::task::Builder::new()
    .with_join_set(joinset)
    .name("whatever")
    .spawn(some_task());

instead of

let mut joinset = JoinSet::new();

let builder = tokio::task::Builder::new()
    .name("whatever");
joinset.spawn_builder(builder, task);

then we could just have the builder continue to provide spawn, spawn_local, and spawn_blocking methods.

If we added JoinSet methods to take a builder, we would have to add a sort of combinatorial explosion of

  • JoinSet::spawn_builder
  • JoinSet::spawn_builder_on
  • JoinSet::spawn_builder_local
  • JoinSet::spawn_builder_local_on
  • JoinSet::spawn_builder_blocking
  • JoinSet::spawn_builder_blocking_on

which feels kind of overwhelming...

The issue is that adding a JoinSet to a builder changes the return type of spawn from JoinHandle to AbortHandle...maybe we have something where the builder does a type state transition from Builder<JoinHandle> to Builder<AbortHandle>, or we have a BuilderWithJoinSet type that has different return types...hmm...

@abonander
Copy link
Contributor

abonander commented May 12, 2022

I think for one, task::Builder doesn't have a way to set the runtime handle or LocalSet that the task should be spawned on, so maybe that option should go there and then get removed from JoinSet.

I think adding typestate to Builder is a good idea, I think it could look something like this:

pub mod builder {
    // Just putting these in a submodule to avoid polluting the `task` module namespace

    // For both of these types, if `None` then fetch the current applicable context on call to `.spawn[_into]()`

    pub struct WithRuntime<'a>(Option<&'a Handle>);

    pub struct WithLocalSet<'a>(Option<&'a LocalSet>);
}

pub struct Builder<'a, Target = WithRuntime<'static>> {
    // ...
}

impl Builder<'_> {
    pub fn new() -> Self { ... }
}

impl Builder<'_, WithLocalSet<'static>> {
    pub fn new_local() -> Self { ... }
}

impl<'a, Target> Builder<'a, Target> {
    pub fn name(&self, name: &'a str) -> Self  { ... }

    pub fn with_runtime<'rt>(&self, runtime: &'r Handle) -> Builder<'a, WithRuntime<'rt>> { ... }

    pub fn with_local_set<'ls>(&self, local_set: &'ls LocalSet> -> Builder<'a, WithLocalSet<'ls>> { ... }
}

impl<'a, 'rt> Builder<'a, WithRuntime<'rt>> {
    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
        where F: Future + Send + 'static, F::Output: Send + 'static
    { ... }

    pub fn spawn_into<F>(&self, join_set: &mut JoinSet<F::Output>, task: F) -> AbortHandle
        where F: Future + Send + 'static, F::Output: Send + 'static
    { ... }
}

impl<'a, 'ls> Builder<'a, WithLocalSet<'ls>> {
    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
        where F: Future + 'static, F::Output: 'static
    { ... }

    pub fn spawn_into<F>(&self, join_set: &mut JoinSet<F::Output>, task: F) -> AbortHandle
        where F: Future + 'static, F::Output: 'static
    { ... }
}

I'm not sure how this could integrate with JoinMap, however, but that doesn't seem to have been released yet anyway.

@hawkw
Copy link
Member

hawkw commented May 13, 2022

Here's my proposal for making JoinSet play nice with the task builder API: #4687

What do others think?

@winterqt
Copy link

winterqt commented Jul 2, 2022

Is there any interest in having JoinSet implement Stream?

@Darksonn
Copy link
Contributor

Darksonn commented Jul 3, 2022

We don't want 0.x crates in our public API, so we can't use Stream in the main Tokio crate. Adding a wrapper to tokio-stream would be fine.

@winterqt
Copy link

winterqt commented Jul 3, 2022

Adding a wrapper to tokio-stream would be fine.

This looks like it would require exposing poll_join_one. I think I can do what's done in BroadcastStream and continuously poll join_one (calling it again if needed)?

@Darksonn
Copy link
Contributor

Darksonn commented Jul 4, 2022

We can expose poll_join_one.

@winterqt
Copy link

winterqt commented Jul 4, 2022

Should I include that (as a separate commit) in my PR, or would you prefer me making a separate PR for that?

@Darksonn
Copy link
Contributor

Darksonn commented Jul 4, 2022

I guess making a separate PR would be good.

Noah-Kennedy added a commit that referenced this issue Aug 17, 2022
Closes #4535.

This leaves the ID-related APIs unstable.
Noah-Kennedy added a commit that referenced this issue Aug 17, 2022
Closes #4535.

This leaves the ID-related APIs unstable.
Noah-Kennedy added a commit that referenced this issue Aug 19, 2022
Closes #4535.

This leaves the ID-related APIs unstable.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-runtime Module: tokio/runtime M-task Module: tokio/task
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants