Skip to content

Commit

Permalink
tracing: instrument time::Sleep (tokio-rs#4072)
Browse files Browse the repository at this point in the history
This branch instruments the `Sleep` resource to allow the tokio-console
to consume data about resources usage. The corresponding console branch
is here: tokio-rs/console#77

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev authored and suikammd committed Oct 7, 2021
1 parent 9aeb541 commit 2f10c55
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 30 deletions.
5 changes: 5 additions & 0 deletions tokio/src/macros/mod.rs
Expand Up @@ -15,6 +15,11 @@ mod ready;
#[macro_use]
mod thread_local;

cfg_trace! {
#[macro_use]
mod trace;
}

#[macro_use]
#[cfg(feature = "rt")]
pub(crate) mod scoped_tls;
Expand Down
27 changes: 27 additions & 0 deletions tokio/src/macros/trace.rs
@@ -0,0 +1,27 @@
cfg_trace! {
macro_rules! trace_op {
($name:literal, $readiness:literal, $parent:expr) => {
tracing::trace!(
target: "runtime::resource::poll_op",
parent: $parent,
op_name = $name,
is_ready = $readiness
);
}
}

macro_rules! trace_poll_op {
($name:literal, $poll:expr, $parent:expr $(,)*) => {
match $poll {
std::task::Poll::Ready(t) => {
trace_op!($name, true, $parent);
std::task::Poll::Ready(t)
}
std::task::Poll::Pending => {
trace_op!($name, false, $parent);
return std::task::Poll::Pending;
}
}
};
}
}
176 changes: 149 additions & 27 deletions tokio/src/time/driver/sleep.rs
@@ -1,11 +1,17 @@
use crate::time::driver::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;

use pin_project_lite::pin_project;
use std::future::Future;
use std::panic::Location;
use std::pin::Pin;
use std::task::{self, Poll};

cfg_trace! {
use crate::time::driver::ClockTime;
}

/// Waits until `deadline` is reached.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
Expand Down Expand Up @@ -39,8 +45,9 @@ use std::task::{self, Poll};
/// [`interval`]: crate::time::interval()
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_until"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep_until(deadline: Instant) -> Sleep {
Sleep::new_timeout(deadline)
return Sleep::new_timeout(deadline, trace::caller_location());
}

/// Waits until `duration` has elapsed.
Expand Down Expand Up @@ -82,10 +89,13 @@ pub fn sleep_until(deadline: Instant) -> Sleep {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_for"))]
#[cfg_attr(docsrs, doc(alias = "wait"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep(duration: Duration) -> Sleep {
let location = trace::caller_location();

match Instant::now().checked_add(duration) {
Some(deadline) => sleep_until(deadline),
None => sleep_until(Instant::far_future()),
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::new_timeout(Instant::far_future(), location),
}
}

Expand Down Expand Up @@ -182,29 +192,95 @@ pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
deadline: Instant,
inner: Inner,

// The link between the `Sleep` instance and the timer that drives it.
#[pin]
entry: TimerEntry,
}
}

cfg_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
resource_span: tracing::Span,
async_op_span: tracing::Span,
time_source: ClockTime,
}
}

cfg_not_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
}
}

impl Sleep {
pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
pub(crate) fn new_timeout(
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);

Sleep { deadline, entry }
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let time_source = handle.time_source().clone();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);

#[cfg(tokio_track_caller)]
let location = location.expect("should have location if tracking caller");

#[cfg(tokio_track_caller)]
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sleep",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

#[cfg(not(tokio_track_caller))]
let resource_span =
tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer");

let async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout");

tracing::trace!(
target: "runtime::resource::state_update",
parent: resource_span.id(),
duration = duration,
duration.unit = "ms",
duration.op = "override",
);

Inner {
deadline,
resource_span,
async_op_span,
time_source,
}
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner { deadline };

Sleep { inner, entry }
}

pub(crate) fn far_future() -> Sleep {
Self::new_timeout(Instant::far_future())
pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
Self::new_timeout(Instant::far_future(), location)
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
self.inner.deadline
}

/// Returns `true` if `Sleep` has elapsed.
Expand Down Expand Up @@ -244,37 +320,83 @@ impl Sleep {
///
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.reset_inner(deadline)
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
*me.deadline = deadline;
(*me.inner).deadline = deadline;

#[cfg(all(tokio_unstable, feature = "tracing"))]
{
me.inner.async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");

tracing::trace!(
target: "runtime::resource::state_update",
parent: me.inner.resource_span.id(),
duration = {
let now = me.inner.time_source.now();
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
deadline_tick.checked_sub(now).unwrap_or(0)
},
duration.unit = "ms",
duration.op = "override",
);
}
}

fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
cfg_not_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
}
}

cfg_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
// Keep track of task budget
let coop = ready!(trace_poll_op!(
"poll_elapsed",
crate::coop::poll_proceed(cx),
me.inner.resource_span.id(),
));

let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
});

trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id())
}
}
}

impl Future for Sleep {
type Output = ();

// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _span = self.inner.async_op_span.clone().entered();

match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
Expand Down
12 changes: 9 additions & 3 deletions tokio/src/time/timeout.rs
Expand Up @@ -4,7 +4,10 @@
//!
//! [`Timeout`]: struct@Timeout

use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep};
use crate::{
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
};

use pin_project_lite::pin_project;
use std::future::Future;
Expand Down Expand Up @@ -45,14 +48,17 @@ use std::task::{self, Poll};
/// }
/// # }
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
where
T: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline),
None => Sleep::far_future(),
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
}
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/util/trace.rs
Expand Up @@ -27,6 +27,15 @@ cfg_trace! {
}
}
}
cfg_time! {
#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> {
#[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))]
return Some(std::panic::Location::caller());
#[cfg(not(all(tokio_track_caller, tokio_unstable, feature = "tracing")))]
None
}
}

cfg_not_trace! {
cfg_rt! {
Expand Down

0 comments on commit 2f10c55

Please sign in to comment.