From 934d61c66624180072bd421c63e63b1797ce2275 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 20 Apr 2022 11:06:29 -0700 Subject: [PATCH] task: add task IDs ## Motivation PR #4538 adds a prototype implementation of a `JoinMap` API in `tokio::task`. In [this comment][1] on that PR, @carllerche pointed out that a much simpler `JoinMap` type could be implemented outside of `tokio` (either in `tokio-util` or in user code) if we just modified `JoinSet` to return a task ID type when spawning new tasks, and when tasks complete. This seems like a better approach for the following reasons: * A `JoinMap`-like type need not become a permanent part of `tokio`'s stable API * Task IDs seem like something that could be generally useful outside of a `JoinMap` implementation ## Solution This branch adds a `tokio::task::Id` type that uniquely identifies a task relative to all currently spawned tasks. The ID is internally represented as a `NonZeroUsize` that's based on the address of the task's header. I thought that it was better to use addresses to generate IDs than assigning sequential IDs to tasks, because a sequential ID would mean adding an additional usize field to the task data somewhere, making it a word bigger. I've added methods to `JoinHandle` and `AbortHandle` for returning a task's ID. In addition, I modified `JoinSet` to add a `join_with_id` method that behaves identically to `join_one` but also returns an ID. This can be used to implement a `JoinMap` type. Note that because `join_with_id` must return a task ID regardless of whether the task completes successfully or returns a `JoinError` (in order to ensure that dead tasks are always cleaned up from a map), it inverts the ordering of the `Option` and `Result` returned by `join_one` --- which we've bikeshedded about a bit [here][2]. This makes the method's return type inconsistent with the existing `join_one` method, which feels not great. As I see it, there are three possible solutions to this: * change the existing `join_one` method to also swap the `Option` and `Result` nesting for consistency. * change `join_with_id` to return `Result, (Id, JoinError)>>`, which feels gross... * add a task ID to `JoinError` as well. [1]: https://github.com/tokio-rs/tokio/pull/4538#issuecomment-1065614755 [2]: https://github.com/tokio-rs/tokio/pull/4335#discussion_r773377901 --- tokio/src/runtime/task/abort.rs | 9 +++++++ tokio/src/runtime/task/join.rs | 11 ++++++++ tokio/src/runtime/task/mod.rs | 21 +++++++++++++++ tokio/src/runtime/task/raw.rs | 22 +++++++++++++++ tokio/src/task/join_set.rs | 47 ++++++++++++++++++++++++--------- tokio/src/task/mod.rs | 2 +- 6 files changed, 99 insertions(+), 13 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index 6ed7ff1b7f2..400362c81dd 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -47,6 +47,15 @@ impl AbortHandle { raw.remote_abort(); } } + + /// Returns a [task ID] that uniquely identifies this task relative to other + /// currently running tasks. + /// + /// [task ID]: crate::task::Id + pub fn id(&self) -> super::Id { + // XXX(eliza): should this return an option instead? probably not... + self.raw.unwrap().id() + } } unsafe impl Send for AbortHandle {} diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index b7846348e34..15a0d7862c4 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -220,6 +220,17 @@ impl JoinHandle { }); super::AbortHandle::new(raw) } + + /// Returns a [task ID] that uniquely identifies this task relative to other + /// currently running tasks. + /// + /// [task ID]: crate::task::Id + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] + pub fn id(&self) -> super::Id { + // XXX(eliza): should this return an option instead? probably not... + self.raw.unwrap().id() + } } impl Unpin for JoinHandle {} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index c7a85c5a173..dc71a7d46c0 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -184,6 +184,27 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; +/// An opaque ID that uniquely identifies a task relative to all other currently +/// running tasks. +/// +/// # Notes +/// +/// - Task IDs are unique relative to other *currently running* tasks. When a +/// task completes, the same ID may be used for another task. +/// - Task IDs are *not* sequential, and do not indicate the order in which +/// tasks are spawned, what runtime a task is spawned on, or any other data. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well... +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct Id(std::num::NonZeroUsize); + /// An owned handle to the task, tracked by ref count. #[repr(transparent)] pub(crate) struct Task { diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 95569f947e5..8daaa5419f3 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -129,6 +129,28 @@ impl RawTask { pub(super) fn ref_inc(self) { self.header().state.ref_inc(); } + + #[cfg(tokio_unstable)] + #[inline] + pub(super) fn id(&self) -> super::Id { + use std::num::NonZeroUsize; + let addr = self.ptr.as_ptr() as usize; + + #[cfg(debug_assertions)] + let inner = NonZeroUsize::new(addr) + .expect("a `NonNull` pointer will never be 0 when cast to `usize`"); + + #[cfg(not(debug_assertions))] + let inner = unsafe { + // Safety: `addr` was cast from a `NonNull` pointer, which must + // never be null (0). Since the pointer is not null, the integer + // will never be zero, so this is safe as long as the `NonNull` was + // constructed safely. + NonZeroUsize::new_unchecked(addr) + }; + + super::Id(inner) + } } impl Clone for RawTask { diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 996ae1a9219..04092ab3fde 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::runtime::Handle; -use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet}; +use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet}; use crate::util::IdleNotifiedSet; /// A collection of tasks spawned on a Tokio runtime. @@ -155,6 +155,25 @@ impl JoinSet { /// statement and some other branch completes first, it is guaranteed that no tasks were /// removed from this `JoinSet`. pub async fn join_one(&mut self) -> Result, JoinError> { + crate::future::poll_fn(|cx| self.poll_join_one(cx)) + .await + .map(|(_, res)| res) + .transpose() + } + + /// Waits until one of the tasks in the set completes and returns its + /// output, along with the [task ID] of the completed task. + /// + /// Returns `None` if the set is empty. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_with_id` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinSet`. + /// + /// [task ID]: crate::task::Id + pub async fn join_with_id(&mut self) -> Option<(Id, Result)> { crate::future::poll_fn(|cx| self.poll_join_one(cx)).await } @@ -191,8 +210,8 @@ impl JoinSet { /// Polls for one of the tasks in the set to complete. /// - /// If this returns `Poll::Ready(Ok(Some(_)))` or `Poll::Ready(Err(_))`, then the task that - /// completed is removed from the set. + /// If this returns `Poll::Ready(Some((_, Ok(_))))` or `Poll::Ready(Some((_, + /// Err(_)))`, then the task that completed is removed from the set. /// /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to @@ -205,24 +224,27 @@ impl JoinSet { /// /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is /// available right now. - /// * `Poll::Ready(Ok(Some(value)))` if one of the tasks in this `JoinSet` has completed. The - /// `value` is the return value of one of the tasks that completed. - /// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been - /// aborted. - /// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty. + /// * `Poll::Ready(Some((Id, Ok(value))))` if one of the tasks in this `JoinSet` has completed. The + /// `value` is the return value of one of the tasks that completed, while + /// `id` is the [task ID] of that task. + /// * `Poll::Ready(Some((Id, Err(err))))` if one of the tasks in this `JoinSet` has panicked or been + /// aborted. The `err` is the `JoinError` from the panicked/aborted + /// task, and `id` is the [task ID] of that task. + /// * `Poll::Ready(None)` if the `JoinSet` is empty. /// /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. /// This can happen if the [coop budget] is reached. /// /// [coop budget]: crate::task#cooperative-scheduling - fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll, JoinError>> { + /// [task ID]: crate::task::Id + fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll)>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. let mut entry = match self.inner.pop_notified(cx.waker()) { Some(entry) => entry, None => { if self.is_empty() { - return Poll::Ready(Ok(None)); + return Poll::Ready(None); } else { // The waker was set by `pop_notified`. return Poll::Pending; @@ -233,8 +255,9 @@ impl JoinSet { let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); if let Poll::Ready(res) = res { - entry.remove(); - Poll::Ready(Some(res).transpose()) + let entry = entry.remove(); + let id = entry.id(); + Poll::Ready(Some((id, res))) } else { // A JoinHandle generally won't emit a wakeup without being ready unless // the coop limit has been reached. We yield to the executor in this diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index bf5530b8339..cebc269bb40 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -303,7 +303,7 @@ cfg_rt! { cfg_unstable! { mod join_set; pub use join_set::JoinSet; - pub use crate::runtime::task::AbortHandle; + pub use crate::runtime::task::{Id, AbortHandle}; } cfg_trace! {