Skip to content

Commit

Permalink
task: update return value of JoinSet::join_one (#4726)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 31, 2022
1 parent 2bad98f commit 5fd1220
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 39 deletions.
8 changes: 3 additions & 5 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -418,14 +418,12 @@ where
/// [`tokio::select!`]: tokio::select
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_one_with_id().await {
Ok(task) => {
let (id, output) = task?;
(Ok(output), id)
}
Err(e) => {
Some(Ok((id, output))) => (Ok(output), id),
Some(Err(e)) => {
let id = e.id();
(Err(e), id)
}
None => return None,
};
let key = self.remove_by_id(id)?;
Some((key, res))
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/tests/loom_join_set.rs
Expand Up @@ -61,15 +61,15 @@ fn abort_all_during_completion() {
set.abort_all();

match set.join_one().await {
Ok(Some(())) => complete_happened.store(true, SeqCst),
Err(err) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Err(err) => panic!("fail: {}", err),
Ok(None) => {
Some(Ok(())) => complete_happened.store(true, SeqCst),
Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Some(Err(err)) => panic!("fail: {}", err),
None => {
unreachable!("Aborting the task does not remove it from the JoinSet.")
}
}

assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));
});

drop(set);
Expand Down
35 changes: 19 additions & 16 deletions tokio/src/task/join_set.rs
Expand Up @@ -43,8 +43,9 @@ use crate::util::IdleNotifiedSet;
/// }
///
/// let mut seen = [false; 10];
/// while let Some(res) = set.join_one().await.unwrap() {
/// seen[res] = true;
/// while let Some(res) = set.join_one().await {
/// let idx = res.unwrap();
/// seen[idx] = true;
/// }
///
/// for i in 0..10 {
Expand Down Expand Up @@ -203,7 +204,7 @@ impl<T: 'static> JoinSet<T> {
/// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
pub async fn join_one(&mut self) -> Option<Result<T, JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_one(cx))
.await
.map(|opt| opt.map(|(_, res)| res))
Expand All @@ -214,21 +215,25 @@ impl<T: 'static> JoinSet<T> {
///
/// Returns `None` if the set is empty.
///
/// When this method returns an error, then the id of the task that failed can be accessed
/// using the [`JoinError::id`] method.
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_one_with_id` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
///
/// [task ID]: crate::task::Id
pub async fn join_one_with_id(&mut self) -> Result<Option<(Id, T)>, JoinError> {
/// [`JoinError::id`]: fn@crate::task::JoinError::id
pub async fn join_one_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_one(cx)).await
}

/// Aborts all tasks and waits for them to finish shutting down.
///
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in
/// a loop until it returns `Ok(None)`.
/// a loop until it returns `None`.
///
/// This method ignores any panics in the tasks shutting down. When this call returns, the
/// `JoinSet` will be empty.
Expand All @@ -237,7 +242,7 @@ impl<T: 'static> JoinSet<T> {
/// [`join_one`]: fn@Self::join_one
pub async fn shutdown(&mut self) {
self.abort_all();
while self.join_one().await.transpose().is_some() {}
while self.join_one().await.is_some() {}
}

/// Aborts all tasks on this `JoinSet`.
Expand All @@ -258,8 +263,7 @@ impl<T: 'static> JoinSet<T> {

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some((_, Ok(_))))` or `Poll::Ready(Some((_,
/// Err(_)))`, then the task that completed is removed from the set.
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
Expand All @@ -272,26 +276,26 @@ impl<T: 'static> JoinSet<T> {
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Ok(Some((id, value)))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed, while
/// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
/// The `value` is the return value of one of the tasks that completed, and
/// `id` is the [task ID] of that task.
/// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
/// [task ID]: crate::task::Id
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<(Id, T)>, JoinError>> {
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(Id, T), JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(Ok(None));
return Poll::Ready(None);
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
Expand All @@ -305,8 +309,7 @@ impl<T: 'static> JoinSet<T> {
let entry = entry.remove();
// If the task succeeded, add the task ID to the output. Otherwise, the
// `JoinError` will already have the task's ID.
let res = res.map(|output| (entry.id(), output));
Poll::Ready(Some(res).transpose())
Poll::Ready(Some(res.map(|output| (entry.id(), output))))
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
Expand Down
30 changes: 17 additions & 13 deletions tokio/tests/task_join_set.rs
Expand Up @@ -24,7 +24,7 @@ async fn test_with_sleep() {
set.detach_all();
assert_eq!(set.len(), 0);

assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));

for i in 0..10 {
set.spawn(async move {
Expand All @@ -35,14 +35,14 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some(res) = set.join_one().await.unwrap() {
while let Some(res) = set.join_one().await.transpose().unwrap() {
seen[res] = true;
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));

// Do it again.
for i in 0..10 {
Expand All @@ -53,14 +53,14 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some(res) = set.join_one().await.unwrap() {
while let Some(res) = set.join_one().await.transpose().unwrap() {
seen[res] = true;
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(set.join_one().await, Ok(None)));
assert!(matches!(set.join_one().await, None));
}

#[tokio::test]
Expand Down Expand Up @@ -124,15 +124,15 @@ async fn abort_tasks() {
}
loop {
match set.join_one().await {
Ok(Some(res)) => {
Some(Ok(res)) => {
num_completed += 1;
assert_eq!(res % 2, 0);
}
Err(e) => {
Some(Err(e)) => {
assert!(e.is_cancelled());
num_canceled += 1;
}
Ok(None) => break,
None => break,
}
}

Expand All @@ -149,7 +149,11 @@ fn runtime_gone() {
drop(rt);
}

assert!(rt().block_on(set.join_one()).unwrap_err().is_cancelled());
assert!(rt()
.block_on(set.join_one())
.unwrap()
.unwrap_err()
.is_cancelled());
}

// This ensures that `join_one` works correctly when the coop budget is
Expand Down Expand Up @@ -179,14 +183,14 @@ async fn join_set_coop() {
let mut coop_count = 0;
loop {
match set.join_one().now_or_never() {
Some(Ok(Some(()))) => {}
Some(Err(err)) => panic!("failed: {}", err),
Some(Some(Ok(()))) => {}
Some(Some(Err(err))) => panic!("failed: {}", err),
None => {
coop_count += 1;
tokio::task::yield_now().await;
continue;
}
Some(Ok(None)) => break,
Some(None) => break,
}

count += 1;
Expand Down Expand Up @@ -215,7 +219,7 @@ async fn abort_all() {
assert_eq!(set.len(), 10);

let mut count = 0;
while let Some(res) = set.join_one().await.transpose() {
while let Some(res) = set.join_one().await {
if let Err(err) = res {
assert!(err.is_cancelled());
}
Expand Down

0 comments on commit 5fd1220

Please sign in to comment.