Skip to content

Commit

Permalink
joinset: rename join_one to join_next (#4755)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbarsky committed Jun 9, 2022
1 parent 340c4dc commit 08d4953
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 68 deletions.
57 changes: 32 additions & 25 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -50,9 +50,9 @@ use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet};
///
/// let mut seen = [false; 10];
///
/// // When a task completes, `join_one` returns the task's key along
/// // When a task completes, `join_next` returns the task's key along
/// // with its output.
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// seen[key] = true;
/// assert!(res.is_ok(), "task {} completed successfully!", key);
/// }
Expand Down Expand Up @@ -82,7 +82,7 @@ use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet};
/// // provided key.
/// assert!(aborted);
///
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// if key == "goodbye world" {
/// // The aborted task should complete with a cancelled `JoinError`.
/// assert!(res.unwrap_err().is_cancelled());
Expand Down Expand Up @@ -277,14 +277,14 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn<F>(&mut self, key: K, task: F)
where
Expand All @@ -301,10 +301,10 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle)
where
Expand All @@ -321,15 +321,15 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if it is called outside of a `LocalSet`.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_local<F>(&mut self, key: K, task: F)
where
Expand All @@ -345,11 +345,11 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_local_on<F>(&mut self, key: K, task: F, local_set: &LocalSet)
where
Expand Down Expand Up @@ -399,7 +399,7 @@ where
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_one` is used as the event in a [`tokio::select!`]
/// This method is cancel safe. If `join_next` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinMap`.
///
Expand All @@ -416,8 +416,9 @@ where
/// * `None` if the `JoinMap` is empty.
///
/// [`tokio::select!`]: tokio::select
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_one_with_id().await {
#[doc(alias = "join_one")]
pub async fn join_next(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_next_with_id().await {
Some(Ok((id, output))) => (Ok(output), id),
Some(Err(e)) => {
let id = e.id();
Expand All @@ -429,19 +430,25 @@ where
Some((key, res))
}

#[doc(hidden)]
#[deprecated(since = "0.7.4", note = "renamed to `JoinMap::join_next`.")]
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
self.join_next().await
}

/// Aborts all tasks and waits for them to finish shutting down.
///
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
/// a loop until it returns `None`.
///
/// This method ignores any panics in the tasks shutting down. When this call returns, the
/// `JoinMap` will be empty.
///
/// [`abort_all`]: fn@Self::abort_all
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
pub async fn shutdown(&mut self) {
self.abort_all();
while self.join_one().await.is_some() {}
while self.join_next().await.is_some() {}
}

/// Abort the task corresponding to the provided `key`.
Expand All @@ -467,7 +474,7 @@ where
/// // Look up the "goodbye world" task in the map and abort it.
/// map.abort("goodbye world");
///
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// if key == "goodbye world" {
/// // The aborted task should complete with a cancelled `JoinError`.
/// assert!(res.unwrap_err().is_cancelled());
Expand Down Expand Up @@ -548,7 +555,7 @@ where
/// map.abort_matching(|key| key.starts_with("goodbye"));
///
/// let mut seen = 0;
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// seen += 1;
/// if key.starts_with("goodbye") {
/// // The aborted task should complete with a cancelled `JoinError`.
Expand All @@ -567,7 +574,7 @@ where
pub fn abort_matching(&mut self, mut predicate: impl FnMut(&K) -> bool) {
// Note: this method iterates over the tasks and keys *without* removing
// any entries, so that the keys from aborted tasks can still be
// returned when calling `join_one` in the future.
// returned when calling `join_next` in the future.
for (Key { ref key, .. }, task) in &self.tasks_by_key {
if predicate(key) {
task.abort();
Expand All @@ -578,9 +585,9 @@ where
/// Returns `true` if this `JoinMap` contains a task for the provided key.
///
/// If the task has completed, but its output hasn't yet been consumed by a
/// call to [`join_one`], this method will still return `true`.
/// call to [`join_next`], this method will still return `true`.
///
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
pub fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
where
Q: Hash + Eq,
Expand All @@ -593,9 +600,9 @@ where
/// [task ID].
///
/// If the task has completed, but its output hasn't yet been consumed by a
/// call to [`join_one`], this method will still return `true`.
/// call to [`join_next`], this method will still return `true`.
///
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
/// [task ID]: tokio::task::Id
pub fn contains_task(&self, task: &Id) -> bool {
self.get_by_id(task).is_some()
Expand Down Expand Up @@ -739,7 +746,7 @@ where
/// Aborts all tasks on this `JoinMap`.
///
/// This does not remove the tasks from the `JoinMap`. To wait for the tasks to complete
/// cancellation, you should call `join_one` in a loop until the `JoinMap` is empty.
/// cancellation, you should call `join_next` in a loop until the `JoinMap` is empty.
pub fn abort_all(&mut self) {
self.tasks.abort_all()
}
Expand Down
24 changes: 12 additions & 12 deletions tokio-util/tests/task_join_map.rs
Expand Up @@ -24,7 +24,7 @@ async fn test_with_sleep() {
map.detach_all();
assert_eq!(map.len(), 0);

assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));

for i in 0..10 {
map.spawn(i, async move {
Expand All @@ -35,15 +35,15 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
assert_eq!(res.expect("task should have completed successfully"), k);
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));

// Do it again.
for i in 0..10 {
Expand All @@ -54,15 +54,15 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
assert_eq!(res.expect("task should have completed successfully"), k);
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));
}

#[tokio::test]
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn alternating() {
assert_eq!(map.len(), 2);

for i in 0..16 {
let (_, res) = map.join_one().await.unwrap();
let (_, res) = map.join_next().await.unwrap();
assert!(res.is_ok());
assert_eq!(map.len(), 1);
map.spawn(i, async {});
Expand All @@ -127,7 +127,7 @@ async fn abort_by_key() {
}
}

while let Some((key, res)) = map.join_one().await {
while let Some((key, res)) = map.join_next().await {
match res {
Ok(()) => {
num_completed += 1;
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn abort_by_predicate() {
// abort odd-numbered tasks.
map.abort_matching(|key| key % 2 != 0);

while let Some((key, res)) = map.join_one().await {
while let Some((key, res)) = map.join_next().await {
match res {
Ok(()) => {
num_completed += 1;
Expand Down Expand Up @@ -190,12 +190,12 @@ fn runtime_gone() {
drop(rt);
}

let (key, res) = rt().block_on(map.join_one()).unwrap();
let (key, res) = rt().block_on(map.join_next()).unwrap();
assert_eq!(key, "key");
assert!(res.unwrap_err().is_cancelled());
}

// This ensures that `join_one` works correctly when the coop budget is
// This ensures that `join_next` works correctly when the coop budget is
// exhausted.
#[tokio::test(flavor = "current_thread")]
async fn join_map_coop() {
Expand All @@ -222,7 +222,7 @@ async fn join_map_coop() {
let mut count = 0;
let mut coop_count = 0;
loop {
match map.join_one().now_or_never() {
match map.join_next().now_or_never() {
Some(Some((key, Ok(i)))) => assert_eq!(key, i),
Some(Some((key, Err(err)))) => panic!("failed[{}]: {}", key, err),
None => {
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn abort_all() {

let mut count = 0;
let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
if let Err(err) = res {
assert!(err.is_cancelled());
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/tests/loom_join_set.rs
Expand Up @@ -16,13 +16,13 @@ fn test_join_set() {
assert_eq!(set.len(), 1);
set.spawn(async { () });
assert_eq!(set.len(), 2);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 1);
set.spawn(async { () });
assert_eq!(set.len(), 2);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 1);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 0);
set.spawn(async { () });
assert_eq!(set.len(), 1);
Expand Down Expand Up @@ -60,7 +60,7 @@ fn abort_all_during_completion() {
set.spawn(async { () });
set.abort_all();

match set.join_one().await {
match set.join_next().await {
Some(Ok(())) => complete_happened.store(true, SeqCst),
Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Some(Err(err)) => panic!("fail: {}", err),
Expand All @@ -69,7 +69,7 @@ fn abort_all_during_completion() {
}
}

assert!(matches!(set.join_one().await, None));
assert!(matches!(set.join_next().await, None));
});

drop(set);
Expand Down

0 comments on commit 08d4953

Please sign in to comment.