Skip to content

Commit

Permalink
task: add task::id() and task::try_id() (#5171)
Browse files Browse the repository at this point in the history
  • Loading branch information
agayev committed Nov 13, 2022
1 parent 582d512 commit 71bd49e
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 16 deletions.
5 changes: 1 addition & 4 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -363,10 +363,7 @@ where
fn insert(&mut self, key: K, abort: AbortHandle) {
let hash = self.hash(&key);
let id = abort.id();
let map_key = Key {
id: id.clone(),
key,
};
let map_key = Key { id, key };

// Insert the new key into the map of tasks by keys.
let entry = self
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/runtime/context.rs
Expand Up @@ -8,6 +8,7 @@ use crate::util::rand::{FastRand, RngSeed};

cfg_rt! {
use crate::runtime::scheduler;
use crate::runtime::task::Id;

use std::cell::RefCell;
use std::marker::PhantomData;
Expand All @@ -18,6 +19,8 @@ struct Context {
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,
#[cfg(feature = "rt")]
current_task_id: Cell<Option<Id>>,

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
Expand All @@ -42,6 +45,8 @@ tokio_thread_local! {
/// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
Expand Down Expand Up @@ -107,6 +112,14 @@ cfg_rt! {

pub(crate) struct DisallowBlockInPlaceGuard(bool);

pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
}

pub(crate) fn current_task_id() -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
}

pub(crate) fn try_current() -> Result<scheduler::Handle, TryCurrentError> {
match CONTEXT.try_with(|ctx| ctx.handle.borrow().clone()) {
Ok(Some(handle)) => Ok(handle),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/abort.rs
Expand Up @@ -67,7 +67,7 @@ impl AbortHandle {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
self.id.clone()
self.id
}
}

Expand Down
23 changes: 23 additions & 0 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -11,6 +11,7 @@

use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::context;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Id, Schedule};
Expand Down Expand Up @@ -157,6 +158,26 @@ impl<T: Future> CoreStage<T> {
}
}

/// Set and clear the task id in the context when the future is executed or
/// dropped, or when the output produced by the future is dropped.
pub(crate) struct TaskIdGuard {
parent_task_id: Option<Id>,
}

impl TaskIdGuard {
fn enter(id: Id) -> Self {
TaskIdGuard {
parent_task_id: context::set_current_task_id(Some(id)),
}
}
}

impl Drop for TaskIdGuard {
fn drop(&mut self) {
context::set_current_task_id(self.parent_task_id);
}
}

impl<T: Future, S: Schedule> Core<T, S> {
/// Polls the future.
///
Expand All @@ -183,6 +204,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };

let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
Expand Down Expand Up @@ -236,6 +258,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
}

unsafe fn set_stage(&self, stage: Stage<T>) {
let _guard = TaskIdGuard::enter(self.task_id);
self.stage.stage.with_mut(|ptr| *ptr = stage)
}
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/error.rs
Expand Up @@ -128,7 +128,7 @@ impl JoinError {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> Id {
self.id.clone()
self.id
}
}

Expand Down
7 changes: 3 additions & 4 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -454,13 +454,12 @@ fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
core.drop_future_or_output();
}));

let id = core.task_id.clone();
match res {
Ok(()) => {
core.store_output(Err(JoinError::cancelled(id)));
core.store_output(Err(JoinError::cancelled(core.task_id)));
}
Err(panic) => {
core.store_output(Err(JoinError::panic(id, panic)));
core.store_output(Err(JoinError::panic(core.task_id, panic)));
}
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Po
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => {
core.scheduler.unhandled_panic();
Err(JoinError::panic(core.task_id.clone(), panic))
Err(JoinError::panic(core.task_id, panic))
}
};

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/join.rs
Expand Up @@ -267,7 +267,7 @@ impl<T> JoinHandle<T> {
raw.ref_inc();
raw
});
super::AbortHandle::new(raw, self.id.clone())
super::AbortHandle::new(raw, self.id)
}

/// Returns a [task ID] that uniquely identifies this task relative to other
Expand All @@ -282,7 +282,7 @@ impl<T> JoinHandle<T> {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
self.id.clone()
self.id
}
}

Expand Down
51 changes: 48 additions & 3 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -139,6 +139,8 @@
// unstable. This should be removed once `JoinSet` is stabilized.
#![cfg_attr(not(tokio_unstable), allow(dead_code))]

use crate::runtime::context;

mod core;
use self::core::Cell;
use self::core::Header;
Expand Down Expand Up @@ -193,6 +195,10 @@ use std::{fmt, mem};
/// task completes, the same ID may be used for another task.
/// - Task IDs are *not* sequential, and do not indicate the order in which
/// tasks are spawned, what runtime a task is spawned on, or any other data.
/// - The task ID of the currently running task can be obtained from inside the
/// task via the [`task::try_id()`](crate::task::try_id()) and
/// [`task::id()`](crate::task::id()) functions and from outside the task via
/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
Expand All @@ -201,10 +207,49 @@ use std::{fmt, mem};
/// [unstable]: crate#unstable-features
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well...
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);

/// Returns the [`Id`] of the currently running task.
///
/// # Panics
///
/// This function panics if called from outside a task. Please note that calls
/// to `block_on` do not have task IDs, so the method will panic if called from
/// within a call to `block_on`. For a version of this function that doesn't
/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[track_caller]
pub fn id() -> Id {
context::current_task_id().expect("Can't get a task id when not inside a task")
}

/// Returns the [`Id`] of the currently running task, or `None` if called outside
/// of a task.
///
/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
/// that it returns `None` rather than panicking if called outside of a task
/// context.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[track_caller]
pub fn try_id() -> Option<Id> {
context::current_task_id()
}

/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
Expand Down Expand Up @@ -284,7 +329,7 @@ cfg_rt! {
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(task, scheduler, id.clone());
let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/mod.rs
Expand Up @@ -318,7 +318,7 @@ cfg_rt! {
pub mod join_set;

cfg_unstable! {
pub use crate::runtime::task::Id;
pub use crate::runtime::task::{Id, id, try_id};
}

cfg_trace! {
Expand Down

0 comments on commit 71bd49e

Please sign in to comment.