From 2ae2517736222b74a3d58ebab17cfa6f93b1e42e Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 30 May 2022 17:11:03 +0200 Subject: [PATCH 1/4] task: update return value of `JoinSet::join_one` --- tokio/src/task/join_set.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 42f55a034cc..3d9197aeb7a 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -158,7 +158,7 @@ impl JoinSet { /// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!` /// statement and some other branch completes first, it is guaranteed that no tasks were /// removed from this `JoinSet`. - pub async fn join_one(&mut self) -> Result, JoinError> { + pub async fn join_one(&mut self) -> Option> { crate::future::poll_fn(|cx| self.poll_join_one(cx)) .await .map(|opt| opt.map(|(_, res)| res)) @@ -169,6 +169,9 @@ impl JoinSet { /// /// Returns `None` if the set is empty. /// + /// When this method returns an error, then the id of the task that failed can be accessed + /// using the [`JoinError::id`] method. + /// /// # Cancel Safety /// /// This method is cancel safe. If `join_one_with_id` is used as the event in a `tokio::select!` @@ -176,14 +179,15 @@ impl JoinSet { /// removed from this `JoinSet`. /// /// [task ID]: crate::task::Id - pub async fn join_one_with_id(&mut self) -> Result, JoinError> { + /// [`JoinError::id`]: fn@crate::task::JoinError::id + pub async fn join_one_with_id(&mut self) -> Option> { crate::future::poll_fn(|cx| self.poll_join_one(cx)).await } /// Aborts all tasks and waits for them to finish shutting down. /// /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in - /// a loop until it returns `Ok(None)`. + /// a loop until it returns `None`. /// /// This method ignores any panics in the tasks shutting down. When this call returns, the /// `JoinSet` will be empty. @@ -192,7 +196,7 @@ impl JoinSet { /// [`join_one`]: fn@Self::join_one pub async fn shutdown(&mut self) { self.abort_all(); - while self.join_one().await.transpose().is_some() {} + while self.join_one().await.is_some() {} } /// Aborts all tasks on this `JoinSet`. @@ -213,8 +217,7 @@ impl JoinSet { /// Polls for one of the tasks in the set to complete. /// - /// If this returns `Poll::Ready(Some((_, Ok(_))))` or `Poll::Ready(Some((_, - /// Err(_)))`, then the task that completed is removed from the set. + /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. /// /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to @@ -227,26 +230,26 @@ impl JoinSet { /// /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is /// available right now. - /// * `Poll::Ready(Ok(Some((id, value)))` if one of the tasks in this `JoinSet` has completed. The - /// `value` is the return value of one of the tasks that completed, while + /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed. + /// The `value` is the return value of one of the tasks that completed, and /// `id` is the [task ID] of that task. - /// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been + /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been /// aborted. The `err` is the `JoinError` from the panicked/aborted task. - /// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty. + /// * `Poll::Ready(None)` if the `JoinSet` is empty. /// /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. /// This can happen if the [coop budget] is reached. /// /// [coop budget]: crate::task#cooperative-scheduling /// [task ID]: crate::task::Id - fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll, JoinError>> { + fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. let mut entry = match self.inner.pop_notified(cx.waker()) { Some(entry) => entry, None => { if self.is_empty() { - return Poll::Ready(Ok(None)); + return Poll::Ready(None); } else { // The waker was set by `pop_notified`. return Poll::Pending; @@ -260,8 +263,7 @@ impl JoinSet { let entry = entry.remove(); // If the task succeeded, add the task ID to the output. Otherwise, the // `JoinError` will already have the task's ID. - let res = res.map(|output| (entry.id(), output)); - Poll::Ready(Some(res).transpose()) + Poll::Ready(Some(res.map(|output| (entry.id(), output)))) } else { // A JoinHandle generally won't emit a wakeup without being ready unless // the coop limit has been reached. We yield to the executor in this From f947ff6c7669afbb6b8256d6e27af566e25addc4 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 30 May 2022 17:47:03 +0200 Subject: [PATCH 2/4] Update tests too --- tokio-util/src/task/join_map.rs | 8 +++----- tokio/tests/task_join_set.rs | 30 +++++++++++++++++------------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 299c92faed3..02084604280 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -418,14 +418,12 @@ where /// [`tokio::select!`]: tokio::select pub async fn join_one(&mut self) -> Option<(K, Result)> { let (res, id) = match self.tasks.join_one_with_id().await { - Ok(task) => { - let (id, output) = task?; - (Ok(output), id) - } - Err(e) => { + Some(Ok((id, output))) => (Ok(output), id), + Some(Err(e)) => { let id = e.id(); (Err(e), id) } + None => return None, }; let key = self.remove_by_id(id)?; Some((key, res)) diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 470f861fe9b..d016e0038cf 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -24,7 +24,7 @@ async fn test_with_sleep() { set.detach_all(); assert_eq!(set.len(), 0); - assert!(matches!(set.join_one().await, Ok(None))); + assert!(matches!(set.join_one().await, None)); for i in 0..10 { set.spawn(async move { @@ -35,14 +35,14 @@ async fn test_with_sleep() { } let mut seen = [false; 10]; - while let Some(res) = set.join_one().await.unwrap() { + while let Some(res) = set.join_one().await.transpose().unwrap() { seen[res] = true; } for was_seen in &seen { assert!(was_seen); } - assert!(matches!(set.join_one().await, Ok(None))); + assert!(matches!(set.join_one().await, None)); // Do it again. for i in 0..10 { @@ -53,14 +53,14 @@ async fn test_with_sleep() { } let mut seen = [false; 10]; - while let Some(res) = set.join_one().await.unwrap() { + while let Some(res) = set.join_one().await.transpose().unwrap() { seen[res] = true; } for was_seen in &seen { assert!(was_seen); } - assert!(matches!(set.join_one().await, Ok(None))); + assert!(matches!(set.join_one().await, None)); } #[tokio::test] @@ -124,15 +124,15 @@ async fn abort_tasks() { } loop { match set.join_one().await { - Ok(Some(res)) => { + Some(Ok(res)) => { num_completed += 1; assert_eq!(res % 2, 0); } - Err(e) => { + Some(Err(e)) => { assert!(e.is_cancelled()); num_canceled += 1; } - Ok(None) => break, + None => break, } } @@ -149,7 +149,11 @@ fn runtime_gone() { drop(rt); } - assert!(rt().block_on(set.join_one()).unwrap_err().is_cancelled()); + assert!(rt() + .block_on(set.join_one()) + .unwrap() + .unwrap_err() + .is_cancelled()); } // This ensures that `join_one` works correctly when the coop budget is @@ -179,14 +183,14 @@ async fn join_set_coop() { let mut coop_count = 0; loop { match set.join_one().now_or_never() { - Some(Ok(Some(()))) => {} - Some(Err(err)) => panic!("failed: {}", err), + Some(Some(Ok(()))) => {} + Some(Some(Err(err))) => panic!("failed: {}", err), None => { coop_count += 1; tokio::task::yield_now().await; continue; } - Some(Ok(None)) => break, + Some(None) => break, } count += 1; @@ -215,7 +219,7 @@ async fn abort_all() { assert_eq!(set.len(), 10); let mut count = 0; - while let Some(res) = set.join_one().await.transpose() { + while let Some(res) = set.join_one().await { if let Err(err) = res { assert!(err.is_cancelled()); } From 1a8e9ceaeb21c8e23cd525bf0ea44f3a09f45820 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 30 May 2022 18:04:17 +0200 Subject: [PATCH 3/4] Fix loom tests --- tokio/src/runtime/tests/loom_join_set.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/tests/loom_join_set.rs b/tokio/src/runtime/tests/loom_join_set.rs index e87ddb01406..392b5bf29f8 100644 --- a/tokio/src/runtime/tests/loom_join_set.rs +++ b/tokio/src/runtime/tests/loom_join_set.rs @@ -61,15 +61,15 @@ fn abort_all_during_completion() { set.abort_all(); match set.join_one().await { - Ok(Some(())) => complete_happened.store(true, SeqCst), - Err(err) if err.is_cancelled() => cancel_happened.store(true, SeqCst), - Err(err) => panic!("fail: {}", err), - Ok(None) => { + Some(Ok(())) => complete_happened.store(true, SeqCst), + Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst), + Some(Err(err)) => panic!("fail: {}", err), + None => { unreachable!("Aborting the task does not remove it from the JoinSet.") } } - assert!(matches!(set.join_one().await, Ok(None))); + assert!(matches!(set.join_one().await, None)); }); drop(set); From 442ce0dc58b709334245a21f014df459d58ddafd Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 30 May 2022 18:25:57 +0200 Subject: [PATCH 4/4] Fix doc tests --- tokio/src/task/join_set.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 3d9197aeb7a..43cd78bf873 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -37,8 +37,9 @@ use crate::util::IdleNotifiedSet; /// } /// /// let mut seen = [false; 10]; -/// while let Some(res) = set.join_one().await.unwrap() { -/// seen[res] = true; +/// while let Some(res) = set.join_one().await { +/// let idx = res.unwrap(); +/// seen[idx] = true; /// } /// /// for i in 0..10 {