diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index f7d93c9868c..96483acd7c4 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -22,3 +22,14 @@ cfg_sync! { mod block_on; pub(crate) use block_on::block_on; } + +cfg_trace! { + mod trace; + pub(crate) use trace::InstrumentedFuture as Future; +} + +cfg_not_trace! { + cfg_rt! { + pub(crate) use std::future::Future; + } +} diff --git a/tokio/src/future/trace.rs b/tokio/src/future/trace.rs new file mode 100644 index 00000000000..28789a604d2 --- /dev/null +++ b/tokio/src/future/trace.rs @@ -0,0 +1,11 @@ +use std::future::Future; + +pub(crate) trait InstrumentedFuture: Future { + fn id(&self) -> Option; +} + +impl InstrumentedFuture for tracing::instrument::Instrumented { + fn id(&self) -> Option { + self.span().id() + } +} diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index cbd8e58bb33..13dfb69739f 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -392,7 +392,7 @@ impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + Send + 'static, + F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { let (task, handle) = task::joinable(future); diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 5c9b8ede1a3..b7d725128d7 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -4,7 +4,6 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; -use crate::runtime::blocking::task::BlockingTask; use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; @@ -86,18 +85,6 @@ where rt.spawn_blocking(func) } -#[allow(dead_code)] -pub(crate) fn try_spawn_blocking(func: F) -> Result<(), ()> -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let rt = context::current().expect(CONTEXT_MISSING_ERROR); - - let (task, _handle) = task::joinable(BlockingTask::new(func)); - rt.blocking_spawner.spawn(task, &rt) -} - // ===== impl BlockingPool ===== impl BlockingPool { diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 4f1b4c5795a..173f0ca61f1 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -174,8 +174,11 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { + let fut = BlockingTask::new(func); + #[cfg(all(tokio_unstable, feature = "tracing"))] - let func = { + let fut = { + use tracing::Instrument; #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); #[cfg(tokio_track_caller)] @@ -193,12 +196,9 @@ impl Handle { kind = %"blocking", function = %std::any::type_name::(), ); - move || { - let _g = span.enter(); - func() - } + fut.instrument(span) }; - let (task, handle) = task::joinable(BlockingTask::new(func)); + let (task, handle) = task::joinable(fut); let _ = self.blocking_spawner.spawn(task, &self); handle } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index a37c66796b4..fbcde2cfaf5 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,8 +1,7 @@ cfg_rt! { + use crate::future::Future; use crate::runtime::basic_scheduler; use crate::task::JoinHandle; - - use std::future::Future; } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index fb6dafda391..026a6dcb2f1 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -9,13 +9,13 @@ //! Make sure to consult the relevant safety section of each function before //! use. +use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; -use std::future::Future; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; @@ -71,6 +71,10 @@ pub(crate) struct Header { /// Table of function pointers for executing actions on the task. pub(super) vtable: &'static Vtable, + + /// The tracing ID for this instrumented task. + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) id: Option, } unsafe impl Send for Header {} @@ -93,6 +97,8 @@ impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. pub(super) fn new(future: T, state: State) -> Box> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let id = future.id(); Box::new(Cell { header: Header { state, @@ -100,6 +106,8 @@ impl Cell { queue_next: UnsafeCell::new(None), stack_next: UnsafeCell::new(None), vtable: raw::vtable::(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + id, }, core: Core { scheduler: Scheduler { diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 7d596e36e1a..47bbcc15ffc 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -1,9 +1,9 @@ +use crate::future::Future; use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer}; use crate::runtime::task::state::Snapshot; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{JoinError, Notified, Schedule, Task}; -use std::future::Future; use std::mem; use std::panic; use std::ptr::NonNull; @@ -146,6 +146,11 @@ where } } + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) fn id(&self) -> Option<&tracing::Id> { + self.header().id.as_ref() + } + /// Forcibly shutdown the task /// /// Attempt to transition to `Running` in order to forcibly shutdown the diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 7b49e95abed..58b8c2a15e8 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -26,9 +26,9 @@ cfg_rt_multi_thread! { pub(crate) use self::stack::TransferStack; } +use crate::future::Future; use crate::util::linked_list; -use std::future::Future; use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index cae56d037da..a9cd4e6f4c7 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -1,6 +1,6 @@ +use crate::future::Future; use crate::runtime::task::{Cell, Harness, Header, Schedule, State}; -use std::future::Future; use std::ptr::NonNull; use std::task::{Poll, Waker}; diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index 5c2d478fbbc..ef5fd7438bb 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -1,7 +1,7 @@ +use crate::future::Future; use crate::runtime::task::harness::Harness; use crate::runtime::task::{Header, Schedule}; -use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::ops; @@ -44,12 +44,38 @@ impl ops::Deref for WakerRef<'_, S> { } } +cfg_trace! { + macro_rules! trace { + ($harness:expr, $op:expr) => { + if let Some(id) = $harness.id() { + tracing::trace!( + target: "tokio::task::waker", + op = %$op, + task.id = id.into_u64(), + ); + } + } + } +} + +cfg_not_trace! { + macro_rules! trace { + ($harness:expr, $op:expr) => { + // noop + let _ = &$harness; + } + } +} + unsafe fn clone_waker(ptr: *const ()) -> RawWaker where T: Future, S: Schedule, { let header = ptr as *const Header; + let ptr = NonNull::new_unchecked(ptr as *mut Header); + let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.clone"); (*header).state.ref_inc(); raw_waker::(header) } @@ -61,6 +87,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.drop"); harness.drop_reference(); } @@ -71,6 +98,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.wake"); harness.wake_by_val(); } @@ -82,6 +110,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.wake_by_ref"); harness.wake_by_ref(); } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 47f8ee3454f..421b0786796 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -90,7 +90,7 @@ impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + Send + 'static, + F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { let (task, handle) = task::joinable(future);