Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

joinset: rename join_one to join_next #4755

Merged
merged 10 commits into from Jun 9, 2022
60 changes: 35 additions & 25 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -50,9 +50,9 @@ use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet};
///
/// let mut seen = [false; 10];
///
/// // When a task completes, `join_one` returns the task's key along
/// // When a task completes, `join_next` returns the task's key along
/// // with its output.
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// seen[key] = true;
/// assert!(res.is_ok(), "task {} completed successfully!", key);
/// }
Expand Down Expand Up @@ -82,7 +82,7 @@ use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet};
/// // provided key.
/// assert!(aborted);
///
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// if key == "goodbye world" {
/// // The aborted task should complete with a cancelled `JoinError`.
/// assert!(res.unwrap_err().is_cancelled());
Expand Down Expand Up @@ -277,14 +277,14 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn<F>(&mut self, key: K, task: F)
where
Expand All @@ -301,10 +301,10 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle)
where
Expand All @@ -321,15 +321,15 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if it is called outside of a `LocalSet`.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_local<F>(&mut self, key: K, task: F)
where
Expand All @@ -345,11 +345,11 @@ where
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_one`] will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_one`]: Self::join_one
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_local_on<F>(&mut self, key: K, task: F, local_set: &LocalSet)
where
Expand Down Expand Up @@ -399,7 +399,7 @@ where
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_one` is used as the event in a [`tokio::select!`]
/// This method is cancel safe. If `join_next` is used as the event in a [`tokio::select!`]
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinMap`.
///
Expand All @@ -416,8 +416,9 @@ where
/// * `None` if the `JoinMap` is empty.
///
/// [`tokio::select!`]: tokio::select
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_one_with_id().await {
#[doc(alias = "join_one")]
pub async fn join_next(&mut self) -> Option<(K, Result<V, JoinError>)> {
davidbarsky marked this conversation as resolved.
Show resolved Hide resolved
let (res, id) = match self.tasks.join_next_with_id().await {
Some(Ok((id, output))) => (Ok(output), id),
Some(Err(e)) => {
let id = e.id();
Expand All @@ -429,19 +430,28 @@ where
Some((key, res))
}

#[doc(hidden)]
#[deprecated(
since = "0.7.4",
note = "renamed to `JoinMap::join_next`."
)]
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
self.join_next().await
}

/// Aborts all tasks and waits for them to finish shutting down.
///
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
/// a loop until it returns `None`.
///
/// This method ignores any panics in the tasks shutting down. When this call returns, the
/// `JoinMap` will be empty.
///
/// [`abort_all`]: fn@Self::abort_all
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
pub async fn shutdown(&mut self) {
self.abort_all();
while self.join_one().await.is_some() {}
while self.join_next().await.is_some() {}
}

/// Abort the task corresponding to the provided `key`.
Expand All @@ -467,7 +477,7 @@ where
/// // Look up the "goodbye world" task in the map and abort it.
/// map.abort("goodbye world");
///
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// if key == "goodbye world" {
/// // The aborted task should complete with a cancelled `JoinError`.
/// assert!(res.unwrap_err().is_cancelled());
Expand Down Expand Up @@ -548,7 +558,7 @@ where
/// map.abort_matching(|key| key.starts_with("goodbye"));
///
/// let mut seen = 0;
/// while let Some((key, res)) = map.join_one().await {
/// while let Some((key, res)) = map.join_next().await {
/// seen += 1;
/// if key.starts_with("goodbye") {
/// // The aborted task should complete with a cancelled `JoinError`.
Expand All @@ -567,7 +577,7 @@ where
pub fn abort_matching(&mut self, mut predicate: impl FnMut(&K) -> bool) {
// Note: this method iterates over the tasks and keys *without* removing
// any entries, so that the keys from aborted tasks can still be
// returned when calling `join_one` in the future.
// returned when calling `join_next` in the future.
for (Key { ref key, .. }, task) in &self.tasks_by_key {
if predicate(key) {
task.abort();
Expand All @@ -578,9 +588,9 @@ where
/// Returns `true` if this `JoinMap` contains a task for the provided key.
///
/// If the task has completed, but its output hasn't yet been consumed by a
/// call to [`join_one`], this method will still return `true`.
/// call to [`join_next`], this method will still return `true`.
///
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
pub fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
where
Q: Hash + Eq,
Expand All @@ -593,9 +603,9 @@ where
/// [task ID].
///
/// If the task has completed, but its output hasn't yet been consumed by a
/// call to [`join_one`], this method will still return `true`.
/// call to [`join_next`], this method will still return `true`.
///
/// [`join_one`]: fn@Self::join_one
/// [`join_next`]: fn@Self::join_next
/// [task ID]: tokio::task::Id
pub fn contains_task(&self, task: &Id) -> bool {
self.get_by_id(task).is_some()
Expand Down Expand Up @@ -739,7 +749,7 @@ where
/// Aborts all tasks on this `JoinMap`.
///
/// This does not remove the tasks from the `JoinMap`. To wait for the tasks to complete
/// cancellation, you should call `join_one` in a loop until the `JoinMap` is empty.
/// cancellation, you should call `join_next` in a loop until the `JoinMap` is empty.
pub fn abort_all(&mut self) {
self.tasks.abort_all()
}
Expand Down
24 changes: 12 additions & 12 deletions tokio-util/tests/task_join_map.rs
Expand Up @@ -24,7 +24,7 @@ async fn test_with_sleep() {
map.detach_all();
assert_eq!(map.len(), 0);

assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));

for i in 0..10 {
map.spawn(i, async move {
Expand All @@ -35,15 +35,15 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
assert_eq!(res.expect("task should have completed successfully"), k);
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));

// Do it again.
for i in 0..10 {
Expand All @@ -54,15 +54,15 @@ async fn test_with_sleep() {
}

let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
assert_eq!(res.expect("task should have completed successfully"), k);
}

for was_seen in &seen {
assert!(was_seen);
}
assert!(matches!(map.join_one().await, None));
assert!(matches!(map.join_next().await, None));
}

#[tokio::test]
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn alternating() {
assert_eq!(map.len(), 2);

for i in 0..16 {
let (_, res) = map.join_one().await.unwrap();
let (_, res) = map.join_next().await.unwrap();
assert!(res.is_ok());
assert_eq!(map.len(), 1);
map.spawn(i, async {});
Expand All @@ -127,7 +127,7 @@ async fn abort_by_key() {
}
}

while let Some((key, res)) = map.join_one().await {
while let Some((key, res)) = map.join_next().await {
match res {
Ok(()) => {
num_completed += 1;
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn abort_by_predicate() {
// abort odd-numbered tasks.
map.abort_matching(|key| key % 2 != 0);

while let Some((key, res)) = map.join_one().await {
while let Some((key, res)) = map.join_next().await {
match res {
Ok(()) => {
num_completed += 1;
Expand Down Expand Up @@ -190,12 +190,12 @@ fn runtime_gone() {
drop(rt);
}

let (key, res) = rt().block_on(map.join_one()).unwrap();
let (key, res) = rt().block_on(map.join_next()).unwrap();
assert_eq!(key, "key");
assert!(res.unwrap_err().is_cancelled());
}

// This ensures that `join_one` works correctly when the coop budget is
// This ensures that `join_next` works correctly when the coop budget is
// exhausted.
#[tokio::test(flavor = "current_thread")]
async fn join_map_coop() {
Expand All @@ -222,7 +222,7 @@ async fn join_map_coop() {
let mut count = 0;
let mut coop_count = 0;
loop {
match map.join_one().now_or_never() {
match map.join_next().now_or_never() {
Some(Some((key, Ok(i)))) => assert_eq!(key, i),
Some(Some((key, Err(err)))) => panic!("failed[{}]: {}", key, err),
None => {
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn abort_all() {

let mut count = 0;
let mut seen = [false; 10];
while let Some((k, res)) = map.join_one().await {
while let Some((k, res)) = map.join_next().await {
seen[k] = true;
if let Err(err) = res {
assert!(err.is_cancelled());
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/tests/loom_join_set.rs
Expand Up @@ -16,13 +16,13 @@ fn test_join_set() {
assert_eq!(set.len(), 1);
set.spawn(async { () });
assert_eq!(set.len(), 2);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 1);
set.spawn(async { () });
assert_eq!(set.len(), 2);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 1);
let () = set.join_one().await.unwrap().unwrap();
let () = set.join_next().await.unwrap().unwrap();
assert_eq!(set.len(), 0);
set.spawn(async { () });
assert_eq!(set.len(), 1);
Expand Down Expand Up @@ -60,7 +60,7 @@ fn abort_all_during_completion() {
set.spawn(async { () });
set.abort_all();

match set.join_one().await {
match set.join_next().await {
Some(Ok(())) => complete_happened.store(true, SeqCst),
Some(Err(err)) if err.is_cancelled() => cancel_happened.store(true, SeqCst),
Some(Err(err)) => panic!("fail: {}", err),
Expand All @@ -69,7 +69,7 @@ fn abort_all_during_completion() {
}
}

assert!(matches!(set.join_one().await, None));
assert!(matches!(set.join_next().await, None));
});

drop(set);
Expand Down