Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: instrument time::Sleep #4072

Merged
merged 13 commits into from Sep 22, 2021
5 changes: 5 additions & 0 deletions tokio/src/macros/mod.rs
Expand Up @@ -15,6 +15,11 @@ mod ready;
#[macro_use]
mod thread_local;

cfg_trace! {
#[macro_use]
mod trace;
}

#[macro_use]
#[cfg(feature = "rt")]
pub(crate) mod scoped_tls;
Expand Down
27 changes: 27 additions & 0 deletions tokio/src/macros/trace.rs
@@ -0,0 +1,27 @@
cfg_trace! {
macro_rules! trace_op {
($name:literal, $readiness:literal, $parent:expr) => {
tracing::trace!(
target: "runtime::resource::poll_op",
parent: $parent,
op_name = $name,
is_ready = $readiness
);
}
}

macro_rules! trace_poll_op {
($name:literal, $poll:expr, $parent:expr $(,)*) => {
match $poll {
std::task::Poll::Ready(t) => {
trace_op!($name, true, $parent);
std::task::Poll::Ready(t)
}
std::task::Poll::Pending => {
trace_op!($name, false, $parent);
return std::task::Poll::Pending;
}
}
};
}
}
176 changes: 149 additions & 27 deletions tokio/src/time/driver/sleep.rs
@@ -1,11 +1,17 @@
use crate::time::driver::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;

use pin_project_lite::pin_project;
use std::future::Future;
use std::panic::Location;
use std::pin::Pin;
use std::task::{self, Poll};

cfg_trace! {
use crate::time::driver::ClockTime;
}

/// Waits until `deadline` is reached.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
Expand Down Expand Up @@ -39,8 +45,9 @@ use std::task::{self, Poll};
/// [`interval`]: crate::time::interval()
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_until"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep_until(deadline: Instant) -> Sleep {
Sleep::new_timeout(deadline)
return Sleep::new_timeout(deadline, trace::caller_location());
}

/// Waits until `duration` has elapsed.
Expand Down Expand Up @@ -82,10 +89,13 @@ pub fn sleep_until(deadline: Instant) -> Sleep {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_for"))]
#[cfg_attr(docsrs, doc(alias = "wait"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep(duration: Duration) -> Sleep {
let location = trace::caller_location();

match Instant::now().checked_add(duration) {
Some(deadline) => sleep_until(deadline),
None => sleep_until(Instant::far_future()),
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::new_timeout(Instant::far_future(), location),
}
}

Expand Down Expand Up @@ -182,29 +192,95 @@ pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
deadline: Instant,
inner: Inner,

// The link between the `Sleep` instance and the timer that drives it.
#[pin]
entry: TimerEntry,
}
}

cfg_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
resource_span: tracing::Span,
async_op_span: tracing::Span,
time_source: ClockTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/tioli: it occurs to me that we don't really need to clone the time source into the Inner, as the TimerEntry struct already owns a Handle and exposes access to it via this accessor:

/// Returns a TimerHandle for this timer.
pub(super) fn handle(&self) -> TimerHandle {
TimerHandle {
inner: NonNull::from(self),
}
}

so, rather than cloning the ClockTime into the struct, we could just call self.entry.handle().time_source() whenever we need to get the time. this makes the Inner struct one Instant smaller (since a ClockTime is a zero-sized Clock struct plus an Instant). OTTOH, this requires fewer pointer dereferences than accessing the time source through the handle every time. So, the current approach may actually be better. Anyway, just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that accessor is on TimerShared not TimerEntry, which is what we have in the struct

}
}

cfg_not_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
}
}
Comment on lines 194 to +218
Copy link
Member

@hawkw hawkw Aug 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this a bit more: it might be a bit more concise to move the deadline field back to the Sleep struct, and just stick the tracing-related stuff in a separate struct that's only present if the trace cfg is enabled. something like this:

    pub struct Sleep {
        deadline: Instant,

        // The link between the `Sleep` instance and the timer that drives it.
        #[pin]
        entry: TimerEntry,

         #[cfg(all(tokio_unstable, feature = "tracing"))]
        trace: Trace,
    }
    
// ...

 #[cfg(all(tokio_unstable, feature = "tracing"))]
#[derive(Debug)]
struct Trace {
    resource_span: tracing::Span,
    async_op_span: tracing::Span,
    time_source: ClockTime,
}

that way, we don't need to add a separate Inner type with only one field in the case where tracing is disabled.

Of course, this only works if we use a #[cfg(...)] attribute on the field, rather than using the cfg_trace! macro, since the macro can only be used on item definitions, not fields...

Take it or leave it, of course!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what I was going for originally but pin_project does not play well with cfg attrs on fields.. so therefore I took this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what I was going for originally but pin_project does not play well with cfg attrs on fields.. so therefore I took this approach.

ah, got it...this is fine, then.


impl Sleep {
pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
pub(crate) fn new_timeout(
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);

Sleep { deadline, entry }
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let time_source = handle.time_source().clone();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);

#[cfg(tokio_track_caller)]
let location = location.expect("should have location if tracking caller");

#[cfg(tokio_track_caller)]
let resource_span = tracing::trace_span!(
hawkw marked this conversation as resolved.
Show resolved Hide resolved
"runtime.resource",
concrete_type = "Sleep",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

#[cfg(not(tokio_track_caller))]
let resource_span =
tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer");

let async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout");

tracing::trace!(
target: "runtime::resource::state_update",
parent: resource_span.id(),
duration = duration,
duration.unit = "ms",
duration.op = "override",
);

Inner {
deadline,
resource_span,
async_op_span,
time_source,
}
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner { deadline };

Sleep { inner, entry }
}

pub(crate) fn far_future() -> Sleep {
Self::new_timeout(Instant::far_future())
pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
Self::new_timeout(Instant::far_future(), location)
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
self.inner.deadline
}

/// Returns `true` if `Sleep` has elapsed.
Expand Down Expand Up @@ -244,37 +320,83 @@ impl Sleep {
///
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.reset_inner(deadline)
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
*me.deadline = deadline;
(*me.inner).deadline = deadline;

#[cfg(all(tokio_unstable, feature = "tracing"))]
{
me.inner.async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");

tracing::trace!(
target: "runtime::resource::state_update",
parent: me.inner.resource_span.id(),
duration = {
let now = me.inner.time_source.now();
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
deadline_tick.checked_sub(now).unwrap_or(0)
},
duration.unit = "ms",
duration.op = "override",
);
}
}

fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
cfg_not_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
}
}

cfg_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
// Keep track of task budget
let coop = ready!(trace_poll_op!(
"poll_elapsed",
crate::coop::poll_proceed(cx),
me.inner.resource_span.id(),
));

let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
});

trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id())
}
}
}

impl Future for Sleep {
type Output = ();

// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _span = self.inner.async_op_span.clone().entered();

match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
Expand Down
12 changes: 9 additions & 3 deletions tokio/src/time/timeout.rs
Expand Up @@ -4,7 +4,10 @@
//!
//! [`Timeout`]: struct@Timeout

use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep};
use crate::{
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
};

use pin_project_lite::pin_project;
use std::future::Future;
Expand Down Expand Up @@ -45,14 +48,17 @@ use std::task::{self, Poll};
/// }
/// # }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
where
T: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline),
None => Sleep::far_future(),
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
}
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/util/trace.rs
Expand Up @@ -27,6 +27,15 @@ cfg_trace! {
}
}
}
cfg_time! {
#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> {
#[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))]
return Some(std::panic::Location::caller());
#[cfg(not(all(tokio_track_caller, tokio_unstable, feature = "tracing")))]
None
}
}

cfg_not_trace! {
cfg_rt! {
Expand Down