Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: add a method to retrieve task id #5171

Merged
merged 16 commits into from Nov 13, 2022
5 changes: 1 addition & 4 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -363,10 +363,7 @@ where
fn insert(&mut self, key: K, abort: AbortHandle) {
let hash = self.hash(&key);
let id = abort.id();
let map_key = Key {
id: id.clone(),
key,
};
let map_key = Key { id, key };

// Insert the new key into the map of tasks by keys.
let entry = self
Expand Down
15 changes: 15 additions & 0 deletions tokio/src/runtime/context.rs
@@ -1,5 +1,8 @@
use crate::runtime::coop;

#[cfg(feature = "rt")]
use crate::runtime::task::Id;
agayev marked this conversation as resolved.
Show resolved Hide resolved

use std::cell::Cell;

#[cfg(any(feature = "rt", feature = "macros"))]
Expand All @@ -17,6 +20,8 @@ struct Context {
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,
#[cfg(feature = "rt")]
current_task_id: Cell<Option<Id>>,

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
Expand All @@ -41,6 +46,8 @@ tokio_thread_local! {
/// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
Expand Down Expand Up @@ -107,6 +114,14 @@ cfg_rt! {

pub(crate) struct DisallowBlockInPlaceGuard(bool);

pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
}

pub(crate) fn current_task_id() -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
}

pub(crate) fn try_current() -> Result<scheduler::Handle, TryCurrentError> {
match CONTEXT.try_with(|ctx| ctx.handle.borrow().clone()) {
Ok(Some(handle)) => Ok(handle),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/abort.rs
Expand Up @@ -67,7 +67,7 @@ impl AbortHandle {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
self.id.clone()
self.id
}
}

Expand Down
23 changes: 23 additions & 0 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -11,6 +11,7 @@

use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::context;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Id, Schedule};
Expand Down Expand Up @@ -157,6 +158,26 @@ impl<T: Future> CoreStage<T> {
}
}

/// Set and clear the task id in the context when the future is executed or
/// dropped, or when the output produced by the future is dropped.
pub(crate) struct TaskIdGuard {
parent_task_id: Option<Id>,
}

impl TaskIdGuard {
fn enter(id: Id) -> Self {
TaskIdGuard {
parent_task_id: context::set_current_task_id(Some(id)),
}
}
}

impl Drop for TaskIdGuard {
fn drop(&mut self) {
context::set_current_task_id(self.parent_task_id);
}
}

impl<T: Future, S: Schedule> Core<T, S> {
/// Polls the future.
///
Expand All @@ -183,6 +204,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };

let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
Expand Down Expand Up @@ -236,6 +258,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
}

unsafe fn set_stage(&self, stage: Stage<T>) {
let _guard = TaskIdGuard::enter(self.task_id);
self.stage.stage.with_mut(|ptr| *ptr = stage)
}
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/error.rs
Expand Up @@ -128,7 +128,7 @@ impl JoinError {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> Id {
self.id.clone()
self.id
}
}

Expand Down
7 changes: 3 additions & 4 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -454,13 +454,12 @@ fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
core.drop_future_or_output();
}));

let id = core.task_id.clone();
match res {
Ok(()) => {
core.store_output(Err(JoinError::cancelled(id)));
core.store_output(Err(JoinError::cancelled(core.task_id)));
}
Err(panic) => {
core.store_output(Err(JoinError::panic(id, panic)));
core.store_output(Err(JoinError::panic(core.task_id, panic)));
}
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Po
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => {
core.scheduler.unhandled_panic();
Err(JoinError::panic(core.task_id.clone(), panic))
Err(JoinError::panic(core.task_id, panic))
}
};

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/join.rs
Expand Up @@ -267,7 +267,7 @@ impl<T> JoinHandle<T> {
raw.ref_inc();
raw
});
super::AbortHandle::new(raw, self.id.clone())
super::AbortHandle::new(raw, self.id)
}

/// Returns a [task ID] that uniquely identifies this task relative to other
Expand All @@ -282,7 +282,7 @@ impl<T> JoinHandle<T> {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn id(&self) -> super::Id {
self.id.clone()
self.id
}
}

Expand Down
57 changes: 51 additions & 6 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -139,6 +139,8 @@
// unstable. This should be removed once `JoinSet` is stabilized.
#![cfg_attr(not(tokio_unstable), allow(dead_code))]

use crate::runtime::context;

mod core;
use self::core::Cell;
use self::core::Header;
Expand Down Expand Up @@ -190,9 +192,13 @@ use std::{fmt, mem};
/// # Notes
///
/// - Task IDs are unique relative to other *currently running* tasks. When a
/// task completes, the same ID may be used for another task.
/// - Task IDs are *not* sequential, and do not indicate the order in which
/// tasks are spawned, what runtime a task is spawned on, or any other data.
/// task completes, the same ID may be used for another task. - Task IDs are
/// *not* sequential, and do not indicate the order in which tasks are spawned,
/// what runtime a task is spawned on, or any other data. - The task ID of the
/// currently running task can be obtained from inside the task via the
/// [`task::try_id()`](crate::task::try_id()) and
/// [`task::id()`](crate::task::id()) functions and from outside the task via
/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
agayev marked this conversation as resolved.
Show resolved Hide resolved
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
Expand All @@ -201,10 +207,49 @@ use std::{fmt, mem};
/// [unstable]: crate#unstable-features
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
// TODO(eliza): there's almost certainly no reason not to make this `Copy` as well...
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);
agayev marked this conversation as resolved.
Show resolved Hide resolved

/// Returns the [`Id`] of the currently running task.
///
/// # Panics
///
/// This function panics if called from outside a task. Please note that calls
/// to `block_on` do not have task IDs, so the method will panic if called from
/// within a call to `block_on`. For a version of this function that doesn't
/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
agayev marked this conversation as resolved.
Show resolved Hide resolved
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[track_caller]
pub fn id() -> Id {
agayev marked this conversation as resolved.
Show resolved Hide resolved
context::current_task_id().expect("Can't get a task id when not inside a task")
}

/// Returns the [`Id`] of the currently running task, or `None` if called outside
/// of a task.
///
/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
/// that it returns `None` rather than panicking if called outside of a task
/// context.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [task ID]: crate::task::Id
/// [unstable]: crate#unstable-features
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
agayev marked this conversation as resolved.
Show resolved Hide resolved
#[track_caller]
pub fn try_id() -> Option<Id> {
context::current_task_id()
}

/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
Expand Down Expand Up @@ -284,7 +329,7 @@ cfg_rt! {
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(task, scheduler, id.clone());
let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/mod.rs
Expand Up @@ -318,7 +318,7 @@ cfg_rt! {
pub mod join_set;

cfg_unstable! {
pub use crate::runtime::task::Id;
pub use crate::runtime::task::{Id, id, try_id};
}

cfg_trace! {
Expand Down