Skip to content

Commit

Permalink
add task IDs to tracing spans
Browse files Browse the repository at this point in the history
This commit adds the runtime generated task IDs to the task `tracing`
spans, when tracing is enabled. Unfortunately, this means the IDs have
to be generated much higher up the callstack so they can be added to the
tracing spans, and then passed down to the actual task machinery. But, I
think this is worth it to be able to record that data.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Apr 22, 2022
1 parent ece2698 commit 47a26c5
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 30 deletions.
4 changes: 2 additions & 2 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -370,12 +370,12 @@ impl Context {

impl Spawner {
/// Spawns a future onto the basic scheduler
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F, id: super::task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = self.shared.owned.bind(future, self.shared.clone());
let (handle, notified) = self.shared.owned.bind(future, self.shared.clone(), id);

if let Some(notified) = notified {
self.shared.schedule(notified);
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/runtime/handle.rs
Expand Up @@ -175,9 +175,10 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", None);
self.spawner.spawn(future)
let future = crate::util::trace::task(future, "task", None, id.as_usize());
self.spawner.spawn(future, id)
}

/// Runs the provided function on an executor dedicated to blocking.
Expand Down Expand Up @@ -388,7 +389,7 @@ impl HandleInner {
R: Send + 'static,
{
let fut = BlockingTask::new(func);

let id = super::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
use tracing::Instrument;
Expand All @@ -398,6 +399,7 @@ impl HandleInner {
"runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
task.id = id.as_usize(),
"fn" = %std::any::type_name::<F>(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
Expand All @@ -407,7 +409,7 @@ impl HandleInner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, NoopSchedule);
let (task, handle) = task::unowned(fut, NoopSchedule, id);
let spawned = self
.blocking_spawner
.spawn(blocking::Task::new(task, is_mandatory), rt);
Expand Down
7 changes: 4 additions & 3 deletions tokio/src/runtime/spawner.rs
@@ -1,4 +1,5 @@
use crate::future::Future;
use crate::runtime::task::Id;
use crate::runtime::{basic_scheduler, HandleInner};
use crate::task::JoinHandle;

Expand All @@ -23,15 +24,15 @@ impl Spawner {
}
}

pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Spawner::Basic(spawner) => spawner.spawn(future),
Spawner::Basic(spawner) => spawner.spawn(future, id),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
Spawner::ThreadPool(spawner) => spawner.spawn(future, id),
}
}

Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/task/list.rs
Expand Up @@ -84,13 +84,14 @@ impl<S: 'static> OwnedTasks<S> {
&self,
task: T,
scheduler: S,
id: super::Id,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);
let (task, notified, join) = super::new_task(task, scheduler, id);

unsafe {
// safety: We just created the task, so we have exclusive access
Expand Down Expand Up @@ -187,13 +188,14 @@ impl<S: 'static> LocalOwnedTasks<S> {
&self,
task: T,
scheduler: S,
id: super::Id,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let (task, notified, join) = super::new_task(task, scheduler);
let (task, notified, join) = super::new_task(task, scheduler, id);

unsafe {
// safety: We just created the task, so we have exclusive access
Expand Down
14 changes: 9 additions & 5 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -271,14 +271,14 @@ cfg_rt! {
/// notification.
fn new_task<T, S>(
task: T,
scheduler: S
scheduler: S,
id: Id,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let id = Id::next();
let raw = RawTask::new::<T, S>(task, scheduler, id.clone());
let task = Task {
raw,
Expand All @@ -297,13 +297,13 @@ cfg_rt! {
/// only when the task is not going to be stored in an `OwnedTasks` list.
///
/// Currently only blocking tasks use this method.
pub(crate) fn unowned<T, S>(task: T, scheduler: S) -> (UnownedTask<S>, JoinHandle<T::Output>)
pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Send + Future + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = new_task(task, scheduler);
let (task, notified, join) = new_task(task, scheduler, id);

// This transfers the ref-count of task and notified into an UnownedTask.
// This is valid because an UnownedTask holds two ref-counts.
Expand Down Expand Up @@ -480,9 +480,13 @@ impl fmt::Display for Id {
}

impl Id {
fn next() -> Self {
pub(crate) fn next() -> Self {
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}

pub(crate) fn as_usize(&self) -> usize {
self.0
}
}
6 changes: 3 additions & 3 deletions tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -14,7 +14,7 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::JoinHandle;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Callback, Driver, HandleInner};

use std::fmt;
Expand Down Expand Up @@ -98,12 +98,12 @@ impl Drop for ThreadPool {

impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
worker::Shared::bind_new_task(&self.shared, future)
worker::Shared::bind_new_task(&self.shared, future, id)
}

pub(crate) fn shutdown(&mut self) {
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -723,12 +723,16 @@ impl Shared {
&self.handle_inner
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output>
pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
id: crate::runtime::task::Id,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.owned.bind(future, me.clone());
let (handle, notified) = me.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.schedule(notified, false);
Expand Down
13 changes: 9 additions & 4 deletions tokio/src/task/local.rs
Expand Up @@ -301,12 +301,13 @@ cfg_rt! {
where F: Future + 'static,
F::Output: 'static
{
let future = crate::util::trace::task(future, "local", name);
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_usize());
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");

let (handle, notified) = cx.owned.bind(future, cx.shared.clone());
let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id);

if let Some(notified) = notified {
cx.shared.schedule(notified);
Expand Down Expand Up @@ -385,9 +386,13 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local", None);
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", None, id.as_usize());

let (handle, notified) = self.context.owned.bind(future, self.context.shared.clone());
let (handle, notified) = self
.context
.owned
.bind(future, self.context.shared.clone(), id);

if let Some(notified) = notified {
self.context.shared.schedule(notified);
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/task/spawn.rs
Expand Up @@ -142,8 +142,10 @@ cfg_rt! {
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(future, "task", name);
spawn_handle.spawn(task)
use crate::runtime::{task, context};
let id = task::Id::next();
let spawn_handle = context::spawn_handle().expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(future, "task", name, id.as_usize());
spawn_handle.spawn(task, id)
}
}
5 changes: 3 additions & 2 deletions tokio/src/util/trace.rs
Expand Up @@ -10,14 +10,15 @@ cfg_trace! {

#[inline]
#[track_caller]
pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>) -> Instrumented<F> {
pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>, id: usize) -> Instrumented<F> {
use tracing::instrument::Instrument;
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "tokio::task",
"runtime.spawn",
%kind,
task.name = %name.unwrap_or_default(),
task.id = id,
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
Expand Down Expand Up @@ -91,7 +92,7 @@ cfg_time! {
cfg_not_trace! {
cfg_rt! {
#[inline]
pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>) -> F {
pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>, _: usize) -> F {
// nop
task
}
Expand Down

0 comments on commit 47a26c5

Please sign in to comment.