Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: update return value of JoinSet::join_one #4726

Merged
merged 5 commits into from May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 3 additions & 5 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -418,14 +418,12 @@ where
/// [`tokio::select!`]: tokio::select
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_one_with_id().await {
Ok(task) => {
let (id, output) = task?;
(Ok(output), id)
}
Err(e) => {
Some(Ok((id, output))) => (Ok(output), id),
Some(Err(e)) => {
let id = e.id();
(Err(e), id)
}
None => return None,
};
let key = self.remove_by_id(id)?;
Some((key, res))
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/tests/loom_join_set.rs
Expand Up @@ -61,15 +61,15 @@ fn abort_all_during_completion() {
set.abort_all();

match set.join_one().await {
Ok(Some(())) => complete_happened.store(true, SeqCst),
Err(err) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Err(err) => panic!("fail: {}", err),
Ok(None) => {
Some(Ok(())) => complete_happened.store(true, SeqCst),
Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Some(Err(err)) => panic!("fail: {}", err),
None => {
unreachable!("Aborting the task does not remove it from the JoinSet.")
}
}

assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));
});

drop(set);
Expand Down
35 changes: 19 additions & 16 deletions tokio/src/task/join_set.rs
Expand Up @@ -43,8 +43,9 @@ use crate::util::IdleNotifiedSet;
/// }
///
/// let mut seen = [false; 10];
/// while let Some(res) = set.join_one().await.unwrap() {
/// seen[res] = true;
/// while let Some(res) = set.join_one().await {
/// let idx = res.unwrap();
/// seen[idx] = true;
/// }
///
/// for i in 0..10 {
Expand Down Expand Up @@ -203,7 +204,7 @@ impl<T: 'static> JoinSet<T> {
/// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
pub async fn join_one(&mut self) -> Option<Result<T, JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_one(cx))
.await
.map(|opt| opt.map(|(_, res)| res))
Expand All @@ -214,21 +215,25 @@ impl<T: 'static> JoinSet<T> {
///
/// Returns `None` if the set is empty.
///
/// When this method returns an error, then the id of the task that failed can be accessed
/// using the [`JoinError::id`] method.
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_one_with_id` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
///
/// [task ID]: crate::task::Id
pub async fn join_one_with_id(&mut self) -> Result<Option<(Id, T)>, JoinError> {
/// [`JoinError::id`]: fn@crate::task::JoinError::id
pub async fn join_one_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_one(cx)).await
}

/// Aborts all tasks and waits for them to finish shutting down.
///
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in
/// a loop until it returns `Ok(None)`.
/// a loop until it returns `None`.
///
/// This method ignores any panics in the tasks shutting down. When this call returns, the
/// `JoinSet` will be empty.
Expand All @@ -237,7 +242,7 @@ impl<T: 'static> JoinSet<T> {
/// [`join_one`]: fn@Self::join_one
pub async fn shutdown(&mut self) {
self.abort_all();
while self.join_one().await.transpose().is_some() {}
while self.join_one().await.is_some() {}
}

/// Aborts all tasks on this `JoinSet`.
Expand All @@ -258,8 +263,7 @@ impl<T: 'static> JoinSet<T> {

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some((_, Ok(_))))` or `Poll::Ready(Some((_,
/// Err(_)))`, then the task that completed is removed from the set.
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
Expand All @@ -272,26 +276,26 @@ impl<T: 'static> JoinSet<T> {
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Ok(Some((id, value)))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed, while
/// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
/// The `value` is the return value of one of the tasks that completed, and
/// `id` is the [task ID] of that task.
/// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
/// [task ID]: crate::task::Id
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<(Id, T)>, JoinError>> {
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(Id, T), JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(Ok(None));
return Poll::Ready(None);
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
Expand All @@ -305,8 +309,7 @@ impl<T: 'static> JoinSet<T> {
let entry = entry.remove();
// If the task succeeded, add the task ID to the output. Otherwise, the
// `JoinError` will already have the task's ID.
let res = res.map(|output| (entry.id(), output));
Poll::Ready(Some(res).transpose())
Poll::Ready(Some(res.map(|output| (entry.id(), output))))
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
Expand Down
30 changes: 17 additions & 13 deletions tokio/tests/task_join_set.rs
Expand Up @@ -24,7 +24,7 @@ async fn test_with_sleep() {
set.detach_all();
assert_eq!(set.len(), 0);

assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));

for i in 0..10 {
set.spawn(async move {
Expand All @@ -35,14 +35,14 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some(res) = set.join_one().await.unwrap() {
while let Some(res) = set.join_one().await.transpose().unwrap() {
seen[res] = true;
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));

// Do it again.
for i in 0..10 {
Expand All @@ -53,14 +53,14 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some(res) = set.join_one().await.unwrap() {
while let Some(res) = set.join_one().await.transpose().unwrap() {
seen[res] = true;
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));
}

#[tokio::test]
Expand Down Expand Up @@ -124,15 +124,15 @@ async fn abort_tasks() {
}
loop {
match set.join_one().await {
Ok(Some(res)) => {
Some(Ok(res)) => {
num_completed += 1;
assert_eq!(res % 2, 0);
}
Err(e) => {
Some(Err(e)) => {
assert!(e.is_cancelled());
num_canceled += 1;
}
Ok(None) => break,
None => break,
}
}

Expand All @@ -149,7 +149,11 @@ fn runtime_gone() {
drop(rt);
}

assert!(rt().block_on(set.join_one()).unwrap_err().is_cancelled());
assert!(rt()
.block_on(set.join_one())
.unwrap()
.unwrap_err()
.is_cancelled());
}

// This ensures that `join_one` works correctly when the coop budget is
Expand Down Expand Up @@ -179,14 +183,14 @@ async fn join_set_coop() {
let mut coop_count = 0;
loop {
match set.join_one().now_or_never() {
Some(Ok(Some(()))) => {}
Some(Err(err)) => panic!("failed: {}", err),
Some(Some(Ok(()))) => {}
Some(Some(Err(err))) => panic!("failed: {}", err),
None => {
coop_count += 1;
tokio::task::yield_now().await;
continue;
}
Some(Ok(None)) => break,
Some(None) => break,
}

count += 1;
Expand Down Expand Up @@ -215,7 +219,7 @@ async fn abort_all() {
assert_eq!(set.len(), 10);

let mut count = 0;
while let Some(res) = set.join_one().await.transpose() {
while let Some(res) = set.join_one().await {
if let Err(err) = res {
assert!(err.is_cancelled());
}
Expand Down