Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: instrument task wakers #3836

Merged
merged 3 commits into from Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions tokio/src/future/mod.rs
Expand Up @@ -22,3 +22,14 @@ cfg_sync! {
mod block_on;
pub(crate) use block_on::block_on;
}

cfg_trace! {
mod trace;
pub(crate) use trace::InstrumentedFuture as Future;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rename was mostly for convenience in all the other modules. But it could be that people find it too confusing, in which case I'll ditch the rename and just make the name change in all the other places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the public API of spawn methods to use a different trait, or is it only internal? I can't really tell.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it doesn't. And I checked in a playground, since the trait is pub(crate) the compiler will error if the bounds were used on a pub function.

}

cfg_not_trace! {
cfg_rt! {
pub(crate) use std::future::Future;
}
}
11 changes: 11 additions & 0 deletions tokio/src/future/trace.rs
@@ -0,0 +1,11 @@
use std::future::Future;

pub(crate) trait InstrumentedFuture: Future {
fn id(&self) -> Option<tracing::Id>;
}

impl<F: Future> InstrumentedFuture for tracing::instrument::Instrumented<F> {
fn id(&self) -> Option<tracing::Id> {
self.span().id()
}
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -392,7 +392,7 @@ impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
Expand Down
13 changes: 0 additions & 13 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -4,7 +4,6 @@ use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
Expand Down Expand Up @@ -86,18 +85,6 @@ where
rt.spawn_blocking(func)
}

#[allow(dead_code)]
pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = context::current().expect(CONTEXT_MISSING_ERROR);

let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
}

// ===== impl BlockingPool =====

impl BlockingPool {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/handle.rs
Expand Up @@ -174,8 +174,11 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);

#[cfg(all(tokio_unstable, feature = "tracing"))]
let func = {
let fut = {
use tracing::Instrument;
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
#[cfg(tokio_track_caller)]
Expand All @@ -193,12 +196,9 @@ impl Handle {
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
func()
}
fut.instrument(span)
};
let (task, handle) = task::joinable(BlockingTask::new(func));
let (task, handle) = task::joinable(fut);
let _ = self.blocking_spawner.spawn(task, &self);
handle
}
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/spawner.rs
@@ -1,8 +1,7 @@
cfg_rt! {
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;

use std::future::Future;
}

cfg_rt_multi_thread! {
Expand Down
10 changes: 9 additions & 1 deletion tokio/src/runtime/task/core.rs
Expand Up @@ -9,13 +9,13 @@
//! Make sure to consult the relevant safety section of each function before
//! use.

use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;

use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -71,6 +71,10 @@ pub(crate) struct Header {

/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,

/// The tracing ID for this instrumented task.
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(super) id: Option<tracing::Id>,
}

unsafe impl Send for Header {}
Expand All @@ -93,13 +97,17 @@ impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
header: Header {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
id,
},
core: Core {
scheduler: Scheduler {
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/task/harness.rs
@@ -1,9 +1,9 @@
use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::state::Snapshot;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};

use std::future::Future;
use std::mem;
use std::panic;
use std::ptr::NonNull;
Expand Down Expand Up @@ -146,6 +146,11 @@ where
}
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(super) fn id(&self) -> Option<&tracing::Id> {
self.header().id.as_ref()
}

/// Forcibly shutdown the task
///
/// Attempt to transition to `Running` in order to forcibly shutdown the
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/mod.rs
Expand Up @@ -26,9 +26,9 @@ cfg_rt_multi_thread! {
pub(crate) use self::stack::TransferStack;
}

use crate::future::Future;
use crate::util::linked_list;

use std::future::Future;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/raw.rs
@@ -1,6 +1,6 @@
use crate::future::Future;
use crate::runtime::task::{Cell, Harness, Header, Schedule, State};

use std::future::Future;
use std::ptr::NonNull;
use std::task::{Poll, Waker};

Expand Down
31 changes: 30 additions & 1 deletion tokio/src/runtime/task/waker.rs
@@ -1,7 +1,7 @@
use crate::future::Future;
use crate::runtime::task::harness::Harness;
use crate::runtime::task::{Header, Schedule};

use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::ops;
Expand Down Expand Up @@ -44,12 +44,38 @@ impl<S> ops::Deref for WakerRef<'_, S> {
}
}

cfg_trace! {
macro_rules! trace {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
($harness:expr, $op:expr) => {
if let Some(id) = $harness.id() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the intention here is to avoid emitting the event if the task is not being tracked? i wonder if we actually want to do this inside of the tracing event macro, to handle cases where the event is disabled by one of tracing's filtering fast paths. e.g., if the user compiles tracing with one of the "static_max_level_xxx" feature flags, the event macro will be completely empty and won't compile to any code, but we'll still dereference the trailer and check if it has a task ID.

this is probably an unnecessary microoptimzation, though, and we could do it later --- moving it inside the macro will be much easier when tokio-rs/tracing#1393 is published, anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'd really want to be to check tracing if the event would be emitted, and check if the ID is Some, in the same if statement. I don't want to emit an event if we're not tracking the task either.

tracing::trace!(
target: "tokio::task::waker",
op = %$op,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can see why this is a potential use-case for custom event names...we should definitely consider adding the option to override event names upstream, so that we can use custom names here rather than a field value...but, again, this is fine for now.

task.id = id.into_u64(),
);
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

cfg_not_trace! {
macro_rules! trace {
($harness:expr, $op:expr) => {
// noop
let _ = &$harness;
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
where
T: Future,
S: Schedule,
{
let header = ptr as *const Header;
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.clone");
(*header).state.ref_inc();
raw_waker::<T, S>(header)
}
Expand All @@ -61,6 +87,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.drop");
harness.drop_reference();
}

Expand All @@ -71,6 +98,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.wake");
harness.wake_by_val();
}

Expand All @@ -82,6 +110,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.wake_by_ref");
harness.wake_by_ref();
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -90,7 +90,7 @@ impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
Expand Down