Skip to content

Commit

Permalink
add task IDs to join errors
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Apr 20, 2022
1 parent 934d61c commit a332b7a
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 49 deletions.
9 changes: 8 additions & 1 deletion tokio/src/runtime/task/abort.rs
Expand Up @@ -49,9 +49,16 @@ impl AbortHandle {
}

/// Returns a [task ID] that uniquely identifies this task relative to other
/// currently running tasks.
/// currently spawned tasks.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
// XXX(eliza): should this return an option instead? probably not...
self.raw.unwrap().id()
Expand Down
32 changes: 25 additions & 7 deletions tokio/src/runtime/task/error.rs
Expand Up @@ -2,12 +2,13 @@ use std::any::Any;
use std::fmt;
use std::io;

use super::Id;
use crate::util::SyncWrapper;

cfg_rt! {
/// Task failed to execute to completion.
pub struct JoinError {
repr: Repr,
id: Id,
}
}

Expand All @@ -17,15 +18,17 @@ enum Repr {
}

impl JoinError {
pub(crate) fn cancelled() -> JoinError {
pub(crate) fn cancelled(id: Id) -> JoinError {
JoinError {
repr: Repr::Cancelled,
id,
}
}

pub(crate) fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
pub(crate) fn panic(id: Id, err: Box<dyn Any + Send + 'static>) -> JoinError {
JoinError {
repr: Repr::Panic(SyncWrapper::new(err)),
id,
}
}

Expand Down Expand Up @@ -111,22 +114,37 @@ impl JoinError {
_ => Err(self),
}
}

/// Returns a [task ID] that identifies the task which errored relative to
/// other currently spawned tasks.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> Id {
self.id.clone()
}
}

impl fmt::Display for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
Repr::Cancelled => write!(fmt, "cancelled"),
Repr::Panic(_) => write!(fmt, "panic"),
Repr::Cancelled => write!(fmt, "task {} was cancelled", self.id),
Repr::Panic(_) => write!(fmt, "task {} panicked", self.id),
}
}
}

impl fmt::Debug for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
Repr::Cancelled => write!(fmt, "JoinError::Cancelled({:?})", self.id),
Repr::Panic(_) => write!(fmt, "JoinError::Panic({:?}, ...)", self.id),
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -100,7 +100,8 @@ where
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let res = poll_future(&self.core().stage, cx);
let id = super::Id::from_raw(header_ptr);
let res = poll_future(&self.core().stage, id.clone(), cx);

if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
Expand All @@ -115,13 +116,13 @@ where
// The transition to idle failed because the task was
// cancelled during the poll.

cancel_task(&self.core().stage);
cancel_task(&self.core().stage, id);
PollFuture::Complete
}
}
}
TransitionToRunning::Cancelled => {
cancel_task(&self.core().stage);
cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr()));
PollFuture::Complete
}
TransitionToRunning::Failed => PollFuture::Done,
Expand All @@ -144,7 +145,7 @@ where

// By transitioning the lifecycle to `Running`, we have permission to
// drop the future.
cancel_task(&self.core().stage);
cancel_task(&self.core().stage, super::Id::from_raw(self.header_ptr()));
self.complete();
}

Expand Down Expand Up @@ -432,25 +433,25 @@ enum PollFuture {
}

/// Cancels the task and store the appropriate error in the stage field.
fn cancel_task<T: Future>(stage: &CoreStage<T>) {
fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
stage.drop_future_or_output();
}));

match res {
Ok(()) => {
stage.store_output(Err(JoinError::cancelled()));
stage.store_output(Err(JoinError::cancelled(id)));
}
Err(panic) => {
stage.store_output(Err(JoinError::panic(panic)));
stage.store_output(Err(JoinError::panic(id, panic)));
}
}
}

/// Polls the future. If the future completes, the output is written to the
/// stage field.
fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
fn poll_future<T: Future>(core: &CoreStage<T>, id: super::Id, cx: Context<'_>) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future> {
Expand All @@ -473,7 +474,7 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => Err(JoinError::panic(panic)),
Err(panic) => Err(JoinError::panic(id, panic)),
};

// Catch and ignore panics if the future panics on drop.
Expand Down
9 changes: 8 additions & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -222,9 +222,16 @@ impl<T> JoinHandle<T> {
}

/// Returns a [task ID] that uniquely identifies this task relative to other
/// currently running tasks.
/// currently spawned tasks.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -471,3 +471,31 @@ unsafe impl<S> linked_list::Link for Task<S> {
NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
}
}

impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Id {
#[inline]
fn from_raw(ptr: NonNull<Header>) -> Self {
use std::num::NonZeroUsize;
let addr = ptr.as_ptr() as usize;

#[cfg(debug_assertions)]
let inner = NonZeroUsize::new(addr)
.expect("a `NonNull` pointer will never be 0 when cast to `usize`");

#[cfg(not(debug_assertions))]
let inner = unsafe {
// Safety: `addr` was cast from a `NonNull` pointer, which must
// never be null (0). Since the pointer is not null, the integer
// will never be zero, so this is safe as long as the `NonNull` was
// constructed safely.
NonZeroUsize::new_unchecked(addr)
};

Self(inner)
}
}
20 changes: 1 addition & 19 deletions tokio/src/runtime/task/raw.rs
Expand Up @@ -130,26 +130,8 @@ impl RawTask {
self.header().state.ref_inc();
}

#[cfg(tokio_unstable)]
#[inline]
pub(super) fn id(&self) -> super::Id {
use std::num::NonZeroUsize;
let addr = self.ptr.as_ptr() as usize;

#[cfg(debug_assertions)]
let inner = NonZeroUsize::new(addr)
.expect("a `NonNull` pointer will never be 0 when cast to `usize`");

#[cfg(not(debug_assertions))]
let inner = unsafe {
// Safety: `addr` was cast from a `NonNull` pointer, which must
// never be null (0). Since the pointer is not null, the integer
// will never be zero, so this is safe as long as the `NonNull` was
// constructed safely.
NonZeroUsize::new_unchecked(addr)
};

super::Id(inner)
super::Id::from_raw(self.ptr)
}
}

Expand Down
24 changes: 12 additions & 12 deletions tokio/src/task/join_set.rs
Expand Up @@ -157,8 +157,7 @@ impl<T: 'static> JoinSet<T> {
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> {
crate::future::poll_fn(|cx| self.poll_join_one(cx))
.await
.map(|(_, res)| res)
.transpose()
.map(|opt| opt.map(|(_, res)| res))
}

/// Waits until one of the tasks in the set completes and returns its
Expand All @@ -173,7 +172,7 @@ impl<T: 'static> JoinSet<T> {
/// removed from this `JoinSet`.
///
/// [task ID]: crate::task::Id
pub async fn join_with_id(&mut self) -> Option<(Id, Result<T, JoinError>)> {
pub async fn join_with_id(&mut self) -> Result<Option<(Id, T)>, JoinError> {
crate::future::poll_fn(|cx| self.poll_join_one(cx)).await
}

Expand Down Expand Up @@ -224,27 +223,26 @@ impl<T: 'static> JoinSet<T> {
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Some((Id, Ok(value))))` if one of the tasks in this `JoinSet` has completed. The
/// * `Poll::Ready(Ok(Some((id, value)))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed, while
/// `id` is the [task ID] of that task.
/// * `Poll::Ready(Some((Id, Err(err))))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted
/// task, and `id` is the [task ID] of that task.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
/// * `Poll::Ready(Err(err))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(Ok(None))` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
/// [task ID]: crate::task::Id
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Id, Result<T, JoinError>)>> {
fn poll_join_one(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<(Id, T)>, JoinError>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(None);
return Poll::Ready(Ok(None));
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
Expand All @@ -256,8 +254,10 @@ impl<T: 'static> JoinSet<T> {

if let Poll::Ready(res) = res {
let entry = entry.remove();
let id = entry.id();
Poll::Ready(Some((id, res)))
// If the task succeeded, add the task ID to the output. Otherwise, the
// `JoinError` will already have the task's ID.
let res = res.map(|output| (entry.id(), output));
Poll::Ready(Some(res).transpose())
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
Expand Down

0 comments on commit a332b7a

Please sign in to comment.