From a332b7a60c0f71dacc6a2f38ef36483fb4f76b33 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 20 Apr 2022 11:37:05 -0700 Subject: [PATCH] add task IDs to join errors Signed-off-by: Eliza Weisman --- tokio/src/runtime/task/abort.rs | 9 ++++++++- tokio/src/runtime/task/error.rs | 32 ++++++++++++++++++++++++------- tokio/src/runtime/task/harness.rs | 19 +++++++++--------- tokio/src/runtime/task/join.rs | 9 ++++++++- tokio/src/runtime/task/mod.rs | 28 +++++++++++++++++++++++++++ tokio/src/runtime/task/raw.rs | 20 +------------------ tokio/src/task/join_set.rs | 24 +++++++++++------------ 7 files changed, 92 insertions(+), 49 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index 400362c81dd..ddea6803ab9 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -49,9 +49,16 @@ impl AbortHandle { } /// Returns a [task ID] that uniquely identifies this task relative to other - /// currently running tasks. + /// currently spawned tasks. + /// + /// **Note**: This is an [unstable API][unstable]. The public API of this type + /// may break in 1.x releases. See [the documentation on unstable + /// features][unstable] for details. /// /// [task ID]: crate::task::Id + /// [unstable]: crate#unstable-features + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { // XXX(eliza): should this return an option instead? probably not... self.raw.unwrap().id() diff --git a/tokio/src/runtime/task/error.rs b/tokio/src/runtime/task/error.rs index 1a8129b2b6f..22b688aa221 100644 --- a/tokio/src/runtime/task/error.rs +++ b/tokio/src/runtime/task/error.rs @@ -2,12 +2,13 @@ use std::any::Any; use std::fmt; use std::io; +use super::Id; use crate::util::SyncWrapper; - cfg_rt! { /// Task failed to execute to completion. pub struct JoinError { repr: Repr, + id: Id, } } @@ -17,15 +18,17 @@ enum Repr { } impl JoinError { - pub(crate) fn cancelled() -> JoinError { + pub(crate) fn cancelled(id: Id) -> JoinError { JoinError { repr: Repr::Cancelled, + id, } } - pub(crate) fn panic(err: Box) -> JoinError { + pub(crate) fn panic(id: Id, err: Box) -> JoinError { JoinError { repr: Repr::Panic(SyncWrapper::new(err)), + id, } } @@ -111,13 +114,28 @@ impl JoinError { _ => Err(self), } } + + /// Returns a [task ID] that identifies the task which errored relative to + /// other currently spawned tasks. + /// + /// **Note**: This is an [unstable API][unstable]. The public API of this type + /// may break in 1.x releases. See [the documentation on unstable + /// features][unstable] for details. + /// + /// [task ID]: crate::task::Id + /// [unstable]: crate#unstable-features + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] + pub fn id(&self) -> Id { + self.id.clone() + } } impl fmt::Display for JoinError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.repr { - Repr::Cancelled => write!(fmt, "cancelled"), - Repr::Panic(_) => write!(fmt, "panic"), + Repr::Cancelled => write!(fmt, "task {} was cancelled", self.id), + Repr::Panic(_) => write!(fmt, "task {} panicked", self.id), } } } @@ -125,8 +143,8 @@ impl fmt::Display for JoinError { impl fmt::Debug for JoinError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.repr { - Repr::Cancelled => write!(fmt, "JoinError::Cancelled"), - Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"), + Repr::Cancelled => write!(fmt, "JoinError::Cancelled({:?})", self.id), + Repr::Panic(_) => write!(fmt, "JoinError::Panic({:?}, ...)", self.id), } } } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 261dccea415..d88e01f0cc9 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -100,7 +100,8 @@ where let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let res = poll_future(&self.core().stage, cx); + let id = super::Id::from_raw(header_ptr); + let res = poll_future(&self.core().stage, id.clone(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -115,13 +116,13 @@ where // The transition to idle failed because the task was // cancelled during the poll. - cancel_task(&self.core().stage); + cancel_task(&self.core().stage, id); PollFuture::Complete } } } TransitionToRunning::Cancelled => { - cancel_task(&self.core().stage); + cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr())); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -144,7 +145,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - cancel_task(&self.core().stage); + cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr())); self.complete(); } @@ -432,7 +433,7 @@ enum PollFuture { } /// Cancels the task and store the appropriate error in the stage field. -fn cancel_task(stage: &CoreStage) { +fn cancel_task(stage: &CoreStage, id: super::Id) { // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { stage.drop_future_or_output(); @@ -440,17 +441,17 @@ fn cancel_task(stage: &CoreStage) { match res { Ok(()) => { - stage.store_output(Err(JoinError::cancelled())); + stage.store_output(Err(JoinError::cancelled(id))); } Err(panic) => { - stage.store_output(Err(JoinError::panic(panic))); + stage.store_output(Err(JoinError::panic(id, panic))); } } } /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future(core: &CoreStage, cx: Context<'_>) -> Poll<()> { +fn poll_future(core: &CoreStage, id: super::Id, cx: Context<'_>) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { struct Guard<'a, T: Future> { @@ -473,7 +474,7 @@ fn poll_future(core: &CoreStage, cx: Context<'_>) -> Poll<()> { let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => Err(JoinError::panic(panic)), + Err(panic) => Err(JoinError::panic(id, panic)), }; // Catch and ignore panics if the future panics on drop. diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 15a0d7862c4..7f4dd75b7a0 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -222,9 +222,16 @@ impl JoinHandle { } /// Returns a [task ID] that uniquely identifies this task relative to other - /// currently running tasks. + /// currently spawned tasks. + /// + /// **Note**: This is an [unstable API][unstable]. The public API of this type + /// may break in 1.x releases. See [the documentation on unstable + /// features][unstable] for details. /// /// [task ID]: crate::task::Id + /// [unstable]: crate#unstable-features + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] #[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn id(&self) -> super::Id { diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index dc71a7d46c0..bc28f399f63 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -471,3 +471,31 @@ unsafe impl linked_list::Link for Task { NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr)) } } + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} +impl Id { + #[inline] + fn from_raw(ptr: NonNull
) -> Self { + use std::num::NonZeroUsize; + let addr = ptr.as_ptr() as usize; + + #[cfg(debug_assertions)] + let inner = NonZeroUsize::new(addr) + .expect("a `NonNull` pointer will never be 0 when cast to `usize`"); + + #[cfg(not(debug_assertions))] + let inner = unsafe { + // Safety: `addr` was cast from a `NonNull` pointer, which must + // never be null (0). Since the pointer is not null, the integer + // will never be zero, so this is safe as long as the `NonNull` was + // constructed safely. + NonZeroUsize::new_unchecked(addr) + }; + + Self(inner) + } +} diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 8daaa5419f3..5beb60a1f7b 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -130,26 +130,8 @@ impl RawTask { self.header().state.ref_inc(); } - #[cfg(tokio_unstable)] - #[inline] pub(super) fn id(&self) -> super::Id { - use std::num::NonZeroUsize; - let addr = self.ptr.as_ptr() as usize; - - #[cfg(debug_assertions)] - let inner = NonZeroUsize::new(addr) - .expect("a `NonNull` pointer will never be 0 when cast to `usize`"); - - #[cfg(not(debug_assertions))] - let inner = unsafe { - // Safety: `addr` was cast from a `NonNull` pointer, which must - // never be null (0). Since the pointer is not null, the integer - // will never be zero, so this is safe as long as the `NonNull` was - // constructed safely. - NonZeroUsize::new_unchecked(addr) - }; - - super::Id(inner) + super::Id::from_raw(self.ptr) } } diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 04092ab3fde..a83ff7749f8 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -157,8 +157,7 @@ impl JoinSet { pub async fn join_one(&mut self) -> Result, JoinError> { crate::future::poll_fn(|cx| self.poll_join_one(cx)) .await - .map(|(_, res)| res) - .transpose() + .map(|opt| opt.map(|(_, res)| res)) } /// Waits until one of the tasks in the set completes and returns its @@ -173,7 +172,7 @@ impl JoinSet { /// removed from this `JoinSet`. /// /// [task ID]: crate::task::Id - pub async fn join_with_id(&mut self) -> Option<(Id, Result)> { + pub async fn join_with_id(&mut self) -> Result, JoinError> { crate::future::poll_fn(|cx| self.poll_join_one(cx)).await } @@ -224,27 +223,26 @@ impl JoinSet { /// /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is /// available right now. - /// * `Poll::Ready(Some((Id, Ok(value))))` if one of the tasks in this `JoinSet` has completed. The + /// * `Poll::Ready(Ok(Some((id, value)))` if one of the tasks in this `JoinSet` has completed. The /// `value` is the return value of one of the tasks that completed, while /// `id` is the [task ID] of that task. - /// * `Poll::Ready(Some((Id, Err(err))))` if one of the tasks in this `JoinSet` has panicked or been - /// aborted. The `err` is the `JoinError` from the panicked/aborted - /// task, and `id` is the [task ID] of that task. - /// * `Poll::Ready(None)` if the `JoinSet` is empty. + /// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been + /// aborted. The `err` is the `JoinError` from the panicked/aborted task. + /// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty. /// /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. /// This can happen if the [coop budget] is reached. /// /// [coop budget]: crate::task#cooperative-scheduling /// [task ID]: crate::task::Id - fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll)>> { + fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll, JoinError>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. let mut entry = match self.inner.pop_notified(cx.waker()) { Some(entry) => entry, None => { if self.is_empty() { - return Poll::Ready(None); + return Poll::Ready(Ok(None)); } else { // The waker was set by `pop_notified`. return Poll::Pending; @@ -256,8 +254,10 @@ impl JoinSet { if let Poll::Ready(res) = res { let entry = entry.remove(); - let id = entry.id(); - Poll::Ready(Some((id, res))) + // If the task succeeded, add the task ID to the output. Otherwise, the + // `JoinError` will already have the task's ID. + let res = res.map(|output| (entry.id(), output)); + Poll::Ready(Some(res).transpose()) } else { // A JoinHandle generally won't emit a wakeup without being ready unless // the coop limit has been reached. We yield to the executor in this