Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JoinSet #4335

Merged
merged 21 commits into from Feb 1, 2022
Merged

Add JoinSet #4335

merged 21 commits into from Feb 1, 2022

Conversation

Darksonn
Copy link
Contributor

@Darksonn Darksonn commented Dec 21, 2021

This PR adds a new TaskSet type that uses Tokio's linked_list utility to implement polling in a way that runs in constant time.

Closes: #3903

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-task Module: tokio/task R-loom Run loom tests on this PR labels Dec 21, 2021
That was a previous iteration.
Comment on lines 178 to 187
Some(Poll::Ready(Ok(res))) => {
self.inner.with_entry_value(&entry, |jh| jh.take());
self.inner.remove(&entry);
return Poll::Ready(Ok(Some(res)));
}
Some(Poll::Ready(Err(err))) => {
self.inner.with_entry_value(&entry, |jh| jh.take());
self.inner.remove(&entry);
return Poll::Ready(Err(err));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the code could be deduped here. I don't think we need to re-map it to Result<Option<T>, ...> Option<Result<...>> should be fine?

/// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `TaskSet`.
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably keep this as Option<Result<...>>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's a pain to use in a while let loop if you do that though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am with Alice on this, Result is far more ergonomic for most expected use cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worse? It seems like returning Option means you can loop until all tasks are finished, while as Result makes it easier for one task error to blow up the loop.

while let Some(result) = set.join_one().await {

}
// or
while let Ok(Some(val)) = set.join_one().await {
    // loop canceled on first `Err`?
}

The Option does make it more annoying if you want to use ? (like set.join_one().await?) in the condition. I have an opinion otherwise about which is better or worse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly thinking of the while let Some(result) = set.join_one().await? use-case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree w/ @Darksonn but what is the precedence in std? I'm not sure what the best way to search is. I can't think of any methods that returns Result<Option<_>, _> and the only methods that return Option<Result<...>> would be Iterator, which doesn't really count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that std has precedence on this.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impl looks fine to me. I had a minor API suggestion. I would also suggest we initially release this as an unstable API. We probably should document unstable APIs better though.

@carllerche
Copy link
Member

@Darksonn How would we add an API in the future for spawning tasks that don't abort when the set is dropped?

@Noah-Kennedy
Copy link
Contributor

Considering that task set is going to be used a lot for resource cleanup and management, we probably want something along the lines of a fn shutdown(self) -> impl Future<()> that aborts all of the tasks and waits for them to finish, so that users can await the termination of the tasks on the set.

@Darksonn
Copy link
Contributor Author

How should we handle panics in the sub-tasks when the user has called shutdown?

@carllerche
Copy link
Member

"clean shutdown" is a good call.

What I would probably do is a call to shutdown() aborts remaining tasks, but there may be some tasks that have since completed (either successfully or w/ a panic). The user of TaskSet would then continue to drain results until None. Once None is reached, then no more tasks will complete and the TaskSet can be shutdown.

This is a similar strategy as the mpsc channel, which maps pretty well to this case.

@Noah-Kennedy
Copy link
Contributor

I'm in agreement with Carl here. It would be best not to propagate the panics.

@Darksonn
Copy link
Contributor Author

Darksonn commented Jan 24, 2022

I have reworked the IdleNotifiedSet to guarantee that the value stored in an entry is dropped when the entry is removed from the set. If a value isn't destroyed when the entry is dropped, then that's a ref-cycle because the entry holds a JoinHandle to the task, but the task holds a waker to the entry.

I'm also adding a test for the coop budget as I realized that join_one didn't correctly handle it before I reworked IdleNotifiedSet.

/// [`join_one`]: fn@Self::join_one
pub async fn shutdown(&mut self) {
self.abort_all();
while self.join_one().await.transpose().is_some() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be possibly advantageous to bifurcate this operation, and have the function return impl future and be non-async. That way, we could abort the tasks, then return a future that awaits all tasks. This does add complexity though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn if we made this change in a future PR, would it be considered blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think it would be considered a breaking change.

@Noah-Kennedy
Copy link
Contributor

This seems about ready. I left a few questions and comments, but I don't think they block a merge here.

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I gave the IdleNotifiedSet code a closer look, and it seems correct to me. I had some minor, non-blocking style suggestions; feel free to ignore any you disagree with.

tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
Comment on lines 159 to 162
{
let mut lock = self.lists.inner.lock();
lock.idle.push_front(entry.clone());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can't this just be

Suggested change
{
let mut lock = self.lists.inner.lock();
lock.idle.push_front(entry.clone());
}
self.lists.inner.lock().idle.push_front(entry.clone());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that I am not the only one afflicted by this lifetime/scoping paranoia. I tend to follow the same pattern even though it isn't necessary here I believe.

tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Outdated Show resolved Hide resolved
tokio/src/task/task_set.rs Outdated Show resolved Hide resolved
@Darksonn Darksonn changed the title Add TaskSet Add JoinSet Jan 27, 2022
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, this looks great to me! thanks @Darksonn for addressing my various nitpicks. :)

tokio/src/util/idle_notified_set.rs Show resolved Hide resolved
tokio/src/util/idle_notified_set.rs Show resolved Hide resolved
@carllerche carllerche merged commit 1bb4d23 into master Feb 1, 2022
@carllerche carllerche deleted the task-set branch February 1, 2022 22:17
ellenhp pushed a commit to killyourphone/tokio that referenced this pull request Feb 6, 2022
Adds `JoinSet` for managing multiple spawned tasks and joining them
in completion order.

Closes: tokio-rs#3903
This was referenced Feb 8, 2022
hawkw added a commit that referenced this pull request Feb 15, 2022
# 1.16.2 (February 15, 2022)

This release updates the minimum supported Rust version (MSRV) to 1.49,
the `mio` dependency to v0.8, and the (optional) `parking_lot`
dependency to v0.12. Additionally, it contains several bug fixes, as
well as internal refactoring and performance improvements.

### Fixed

- time: prevent panicking in `sleep` with large durations ([#4495])
- time: eliminate potential panics in `Instant` arithmetic on platforms
  where `Instant::now` is not monotonic ([#4461])
- io: fix `DuplexStream` not participating in cooperative yielding
  ([#4478])
- rt: fix potential double panic when dropping a `JoinHandle` ([#4430])

### Changed

- update minimum supported Rust version to 1.49 ([#4457])
- update `parking_lot` dependency to v0.12.0 ([#4459])
- update `mio` dependency to v0.8 ([#4449])
- rt: remove an unnecessary lock in the blocking pool ([#4436])
- rt: remove an unnecessary enum in the basic scheduler ([#4462])
- time: use bit manipulation instead of modulo to improve performance
  ([#4480])
- net: use `std::future::Ready` instead of our own `Ready` future
  ([#4271])
- replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop`
  ([#4491])
- fix miri failures in intrusive linked lists ([#4397])

### Documented

- io: add an example for `tokio::process::ChildStdin` ([#4479])

### Unstable

The following changes only apply when building with `--cfg
tokio_unstable`:

- task: fix missing location information in `tracing` spans generated by
  `spawn_local` ([#4483])
- task: add `JoinSet` for managing sets of tasks ([#4335])
- metrics: fix compilation error on MIPS ([#4475])
- metrics: fix compilation error on arm32v7 ([#4453])

[#4495]: #4495
[#4461]: #4461
[#4478]: #4478
[#4430]: #4430
[#4457]: #4457
[#4459]: #4459
[#4449]: #4449
[#4462]: #4462
[#4436]: #4436
[#4480]: #4480
[#4271]: #4271
[#4491]: #4491
[#4397]: #4397
[#4479]: #4479
[#4483]: #4483
[#4335]: #4335
[#4475]: #4475
[#4453]: #4453
hawkw added a commit that referenced this pull request Feb 16, 2022
# 1.17.0 (February 16, 2022)

This release updates the minimum supported Rust version (MSRV) to 1.49,
the `mio` dependency to v0.8, and the (optional) `parking_lot`
dependency to v0.12. Additionally, it contains several bug fixes, as
well as internal refactoring and performance improvements.

### Fixed

- time: prevent panicking in `sleep` with large durations ([#4495])
- time: eliminate potential panics in `Instant` arithmetic on platforms
  where `Instant::now` is not monotonic ([#4461])
- io: fix `DuplexStream` not participating in cooperative yielding
  ([#4478])
- rt: fix potential double panic when dropping a `JoinHandle` ([#4430])

### Changed

- update minimum supported Rust version to 1.49 ([#4457])
- update `parking_lot` dependency to v0.12.0 ([#4459])
- update `mio` dependency to v0.8 ([#4449])
- rt: remove an unnecessary lock in the blocking pool ([#4436])
- rt: remove an unnecessary enum in the basic scheduler ([#4462])
- time: use bit manipulation instead of modulo to improve performance
  ([#4480])
- net: use `std::future::Ready` instead of our own `Ready` future
  ([#4271])
- replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop`
  ([#4491])
- fix miri failures in intrusive linked lists ([#4397])

### Documented

- io: add an example for `tokio::process::ChildStdin` ([#4479])

### Unstable

The following changes only apply when building with `--cfg
tokio_unstable`:

- task: fix missing location information in `tracing` spans generated by
  `spawn_local` ([#4483])
- task: add `JoinSet` for managing sets of tasks ([#4335])
- metrics: fix compilation error on MIPS ([#4475])
- metrics: fix compilation error on arm32v7 ([#4453])

[#4495]: #4495
[#4461]: #4461
[#4478]: #4478
[#4430]: #4430
[#4457]: #4457
[#4459]: #4459
[#4449]: #4449
[#4462]: #4462
[#4436]: #4436
[#4480]: #4480
[#4271]: #4271
[#4491]: #4491
[#4397]: #4397
[#4479]: #4479
[#4483]: #4483
[#4335]: #4335
[#4475]: #4475
[#4453]: #4453
hawkw added a commit that referenced this pull request Apr 20, 2022
## Motivation

PR #4538 adds a prototype implementation of a `JoinMap` API in
`tokio::task`. In [this comment][1] on that PR, @carllerche pointed out
that a much simpler `JoinMap` type could be implemented outside of
`tokio` (either in `tokio-util` or in user code) if we just modified
`JoinSet` to return a task ID type when spawning new tasks, and when
tasks complete. This seems like a better approach for the following
reasons:

* A `JoinMap`-like type need not become a permanent part of `tokio`'s
  stable API
* Task IDs seem like something that could be generally useful outside of
  a `JoinMap` implementation

## Solution

This branch adds a `tokio::task::Id` type that uniquely identifies a
task relative to all currently spawned tasks. The ID is internally
represented as a `NonZeroUsize` that's based on the address of the
task's header. I thought that it was better to use addresses to generate
IDs than assigning sequential IDs to tasks, because a sequential ID
would mean adding an additional usize field to the task data
somewhere, making it a word bigger. I've added methods to `JoinHandle`
and `AbortHandle` for returning a task's ID.

In addition, I modified `JoinSet` to add a `join_with_id` method that
behaves identically to `join_one` but also returns an ID. This can be
used to implement a `JoinMap` type.

Note that because `join_with_id` must return a task ID regardless of
whether the task completes successfully or returns a `JoinError` (in
order to ensure that dead tasks are always cleaned up from a map), it
inverts the ordering of the `Option` and `Result` returned by `join_one`
--- which we've bikeshedded about a bit [here][2]. This makes the
method's return type inconsistent with the existing `join_one` method,
which feels not great. As I see it, there are three possible solutions
to this:

* change the existing `join_one` method to also swap the `Option` and
  `Result` nesting for consistency.
* change `join_with_id` to return `Result<Option<(Id, T)>, (Id,
  JoinError)>>`, which feels gross...
* add a task ID to `JoinError` as well.

[1]: #4538 (comment)
[2]: #4335 (comment)
@hawkw hawkw mentioned this pull request Apr 20, 2022
hawkw added a commit that referenced this pull request Apr 22, 2022
## Motivation

PR #4538 adds a prototype implementation of a `JoinMap` API in
`tokio::task`. In [this comment][1] on that PR, @carllerche pointed out
that a much simpler `JoinMap` type could be implemented outside of
`tokio` (either in `tokio-util` or in user code) if we just modified
`JoinSet` to return a task ID type when spawning new tasks, and when
tasks complete. This seems like a better approach for the following
reasons:

* A `JoinMap`-like type need not become a permanent part of `tokio`'s
  stable API
* Task IDs seem like something that could be generally useful outside of
  a `JoinMap` implementation

## Solution

This branch adds a `tokio::task::Id` type that uniquely identifies a
task relative to all currently spawned tasks. The ID is internally
represented as a `NonZeroUsize` that's based on the address of the
task's header. I thought that it was better to use addresses to generate
IDs than assigning sequential IDs to tasks, because a sequential ID
would mean adding an additional usize field to the task data
somewhere, making it a word bigger. I've added methods to `JoinHandle`
and `AbortHandle` for returning a task's ID.

In addition, I modified `JoinSet` to add a `join_with_id` method that
behaves identically to `join_one` but also returns an ID. This can be
used to implement a `JoinMap` type.

Note that because `join_with_id` must return a task ID regardless of
whether the task completes successfully or returns a `JoinError` (in
order to ensure that dead tasks are always cleaned up from a map), it
inverts the ordering of the `Option` and `Result` returned by `join_one`
--- which we've bikeshedded about a bit [here][2]. This makes the
method's return type inconsistent with the existing `join_one` method,
which feels not great. As I see it, there are three possible solutions
to this:

* change the existing `join_one` method to also swap the `Option` and
  `Result` nesting for consistency.
* change `join_with_id` to return `Result<Option<(Id, T)>, (Id,
  JoinError)>>`, which feels gross...
* add a task ID to `JoinError` as well.

[1]: #4538 (comment)
[2]: #4335 (comment)
hawkw added a commit that referenced this pull request Apr 25, 2022
## Motivation

PR #4538 adds a prototype implementation of a `JoinMap` API in
`tokio::task`. In [this comment][1] on that PR, @carllerche pointed out
that a much simpler `JoinMap` type could be implemented outside of
`tokio` (either in `tokio-util` or in user code) if we just modified
`JoinSet` to return a task ID type when spawning new tasks, and when
tasks complete. This seems like a better approach for the following
reasons:

* A `JoinMap`-like type need not become a permanent part of `tokio`'s
  stable API
* Task IDs seem like something that could be generally useful outside of
  a `JoinMap` implementation

## Solution

This branch adds a `tokio::task::Id` type that uniquely identifies a
task relative to all other spawned tasks. Task IDs are assigned
sequentially based on an atomic `usize` counter of spawned tasks.

In addition, I modified `JoinSet` to add a `join_with_id` method that
behaves identically to `join_one` but also returns an ID. This can be
used to implement a `JoinMap` type.

Note that because `join_with_id` must return a task ID regardless of
whether the task completes successfully or returns a `JoinError`, I've
also changed `JoinError` to carry the ID of the task that errored, and 
added a `JoinError::id` method for accessing it. Alternatively, we could
have done one of the following:

* have `join_with_id` return `Option<(Id, Result<T, JoinError>)>`, which
  would be inconsistent with the return type of `join_one` (which we've
  [already bikeshedded over once][2]...)
* have `join_with_id` return `Result<Option<(Id, T)>, (Id, JoinError)>>`,
  which just feels gross.

I thought adding the task ID to `JoinError` was the nicest option, and
is potentially useful for other stuff as well, so it's probably a good API to
have anyway.

[1]: #4538 (comment)
[2]: #4335 (comment)

Closes #4538

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
@abonander abonander mentioned this pull request May 12, 2022
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-task Module: tokio/task R-loom Run loom tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a TaskSet
6 participants