From 82787d53a1a7ab3f513cfa4c1c161a75c4863ddd Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 22 Apr 2022 12:17:45 -0700 Subject: [PATCH] add task IDs to tracing spans This commit adds the runtime generated task IDs to the task `tracing` spans, when tracing is enabled. Unfortunately, this means the IDs have to be generated much higher up the callstack so they can be added to the tracing spans, and then passed down to the actual task machinery. But, I think this is worth it to be able to record that data. Signed-off-by: Eliza Weisman --- tokio/src/runtime/basic_scheduler.rs | 4 ++-- tokio/src/runtime/handle.rs | 10 ++++++---- tokio/src/runtime/spawner.rs | 7 ++++--- tokio/src/runtime/task/list.rs | 6 ++++-- tokio/src/runtime/task/mod.rs | 14 +++++++++----- tokio/src/runtime/thread_pool/mod.rs | 6 +++--- tokio/src/runtime/thread_pool/worker.rs | 8 ++++++-- tokio/src/task/local.rs | 13 +++++++++---- tokio/src/task/spawn.rs | 8 +++++--- tokio/src/util/trace.rs | 5 +++-- 10 files changed, 51 insertions(+), 30 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 401f55b3f2f..72ea68e4925 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -365,12 +365,12 @@ impl Context { impl Spawner { /// Spawns a future onto the basic scheduler - pub(crate) fn spawn(&self, future: F) -> JoinHandle + pub(crate) fn spawn(&self, future: F, id: super::task::Id) -> JoinHandle where F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { - let (handle, notified) = self.shared.owned.bind(future, self.shared.clone()); + let (handle, notified) = self.shared.owned.bind(future, self.shared.clone(), id); if let Some(notified) = notified { self.shared.schedule(notified); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 3365ab73b2a..240bf2a647c 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -166,9 +166,10 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { + let id = crate::runtime::task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", None); - self.spawner.spawn(future) + let future = crate::util::trace::task(future, "task", None, id.as_usize()); + self.spawner.spawn(future, id) } /// Runs the provided function on an executor dedicated to blocking. @@ -251,7 +252,7 @@ impl Handle { R: Send + 'static, { let fut = BlockingTask::new(func); - + let id = super::task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] let fut = { use tracing::Instrument; @@ -261,6 +262,7 @@ impl Handle { "runtime.spawn", kind = %"blocking", task.name = %name.unwrap_or_default(), + task.id = id.as_usize(), "fn" = %std::any::type_name::(), spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), ); @@ -270,7 +272,7 @@ impl Handle { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule); + let (task, handle) = task::unowned(fut, NoopSchedule, id); let spawned = self .blocking_spawner .spawn(blocking::Task::new(task, is_mandatory), self); diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index d81a806cb59..a1290607a0d 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,5 +1,6 @@ use crate::future::Future; use crate::runtime::basic_scheduler; +use crate::runtime::task::Id; use crate::task::JoinHandle; cfg_rt_multi_thread! { @@ -23,15 +24,15 @@ impl Spawner { } } - pub(crate) fn spawn(&self, future: F) -> JoinHandle + pub(crate) fn spawn(&self, future: F, id: Id) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { match self { - Spawner::Basic(spawner) => spawner.spawn(future), + Spawner::Basic(spawner) => spawner.spawn(future, id), #[cfg(feature = "rt-multi-thread")] - Spawner::ThreadPool(spawner) => spawner.spawn(future), + Spawner::ThreadPool(spawner) => spawner.spawn(future, id), } } } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 7758f8db7aa..7a1dff0bbfc 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -84,13 +84,14 @@ impl OwnedTasks { &self, task: T, scheduler: S, + id: super::Id, ) -> (JoinHandle, Option>) where S: Schedule, T: Future + Send + 'static, T::Output: Send + 'static, { - let (task, notified, join) = super::new_task(task, scheduler); + let (task, notified, join) = super::new_task(task, scheduler, id); unsafe { // safety: We just created the task, so we have exclusive access @@ -187,13 +188,14 @@ impl LocalOwnedTasks { &self, task: T, scheduler: S, + id: super::Id, ) -> (JoinHandle, Option>) where S: Schedule, T: Future + 'static, T::Output: 'static, { - let (task, notified, join) = super::new_task(task, scheduler); + let (task, notified, join) = super::new_task(task, scheduler, id); unsafe { // safety: We just created the task, so we have exclusive access diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 879edc5c10d..a47268399a5 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -271,14 +271,14 @@ cfg_rt! { /// notification. fn new_task( task: T, - scheduler: S + scheduler: S, + id: Id, ) -> (Task, Notified, JoinHandle) where S: Schedule, T: Future + 'static, T::Output: 'static, { - let id = Id::next(); let raw = RawTask::new::(task, scheduler, id.clone()); let task = Task { raw, @@ -297,13 +297,13 @@ cfg_rt! { /// only when the task is not going to be stored in an `OwnedTasks` list. /// /// Currently only blocking tasks use this method. - pub(crate) fn unowned(task: T, scheduler: S) -> (UnownedTask, JoinHandle) + pub(crate) fn unowned(task: T, scheduler: S, id: Id) -> (UnownedTask, JoinHandle) where S: Schedule, T: Send + Future + 'static, T::Output: Send + 'static, { - let (task, notified, join) = new_task(task, scheduler); + let (task, notified, join) = new_task(task, scheduler, id); // This transfers the ref-count of task and notified into an UnownedTask. // This is valid because an UnownedTask holds two ref-counts. @@ -480,9 +480,13 @@ impl fmt::Display for Id { } impl Id { - fn next() -> Self { + pub(crate) fn next() -> Self { use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; static NEXT_ID: AtomicUsize = AtomicUsize::new(1); Self(NEXT_ID.fetch_add(1, Relaxed)) } + + pub(crate) fn as_usize(&self) -> usize { + self.0 + } } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index d3f46517cb0..18e6f35da85 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -9,7 +9,7 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; -use crate::runtime::task::JoinHandle; +use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Callback, Parker}; use std::fmt; @@ -90,12 +90,12 @@ impl Drop for ThreadPool { impl Spawner { /// Spawns a future onto the thread pool - pub(crate) fn spawn(&self, future: F) -> JoinHandle + pub(crate) fn spawn(&self, future: F, id: task::Id) -> JoinHandle where F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { - worker::Shared::bind_new_task(&self.shared, future) + worker::Shared::bind_new_task(&self.shared, future, id) } pub(crate) fn shutdown(&mut self) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index a3893177c49..d0706911117 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -715,12 +715,16 @@ impl task::Schedule for Arc { } impl Shared { - pub(super) fn bind_new_task(me: &Arc, future: T) -> JoinHandle + pub(super) fn bind_new_task( + me: &Arc, + future: T, + id: crate::runtime::task::Id, + ) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - let (handle, notified) = me.owned.bind(future, me.clone()); + let (handle, notified) = me.owned.bind(future, me.clone(), id); if let Some(notified) = notified { me.schedule(notified, false); diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 2dbd9706047..adfdca39233 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -301,12 +301,13 @@ cfg_rt! { where F: Future + 'static, F::Output: 'static { - let future = crate::util::trace::task(future, "local", name); + let id = crate::runtime::task::Id::next(); + let future = crate::util::trace::task(future, "local", name, id.as_usize()); CURRENT.with(|maybe_cx| { let cx = maybe_cx .expect("`spawn_local` called from outside of a `task::LocalSet`"); - let (handle, notified) = cx.owned.bind(future, cx.shared.clone()); + let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id); if let Some(notified) = notified { cx.shared.schedule(notified); @@ -385,9 +386,13 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - let future = crate::util::trace::task(future, "local", None); + let id = crate::runtime::task::Id::next(); + let future = crate::util::trace::task(future, "local", None, id.as_usize()); - let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone()); + let (handle, notified) = self + .context + .owned + .bind(future, self.context.shared.clone(), id); if let Some(notified) = notified { self.context.shared.schedule(notified); diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index a9d736674c0..a86274a93ad 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -142,8 +142,10 @@ cfg_rt! { T: Future + Send + 'static, T::Output: Send + 'static, { - let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR); - let task = crate::util::trace::task(future, "task", name); - spawn_handle.spawn(task) + use crate::runtime::{task, context}; + let id = task::Id::next(); + let spawn_handle = context::spawn_handle().expect(CONTEXT_MISSING_ERROR); + let task = crate::util::trace::task(future, "task", name, id.as_usize()); + spawn_handle.spawn(task, id) } } diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 6080e2358ae..b2b78076004 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -10,7 +10,7 @@ cfg_trace! { #[inline] #[track_caller] - pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>) -> Instrumented { + pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>, id: usize) -> Instrumented { use tracing::instrument::Instrument; let location = std::panic::Location::caller(); let span = tracing::trace_span!( @@ -18,6 +18,7 @@ cfg_trace! { "runtime.spawn", %kind, task.name = %name.unwrap_or_default(), + task.id = id, loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), @@ -91,7 +92,7 @@ cfg_time! { cfg_not_trace! { cfg_rt! { #[inline] - pub(crate) fn task(task: F, _: &'static str, _name: Option<&str>) -> F { + pub(crate) fn task(task: F, _: &'static str, _name: Option<&str>, _: usize) -> F { // nop task }