New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tracing: instrument time::Sleep #4072
Changes from 9 commits
b1f2dde
5a51dd2
68de623
3a226df
bb3d8dc
50162e4
a7180b1
b1c8f3e
148a989
ad39bb6
02a4107
e404e9b
4e94841
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
cfg_trace! { | ||
macro_rules! trace_op { | ||
($name:literal, $readiness:literal, $parent:expr) => { | ||
tracing::trace!( | ||
target: "runtime::resource::poll_op", | ||
parent: $parent, | ||
op_name = $name, | ||
is_ready = $readiness | ||
); | ||
} | ||
} | ||
|
||
macro_rules! trace_poll_op { | ||
($name:literal, $poll:expr, $parent:expr $(,)*) => { | ||
match $poll { | ||
std::task::Poll::Ready(t) => { | ||
trace_op!($name, true, $parent); | ||
std::task::Poll::Ready(t) | ||
} | ||
std::task::Poll::Pending => { | ||
trace_op!($name, false, $parent); | ||
return std::task::Poll::Pending; | ||
} | ||
} | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3,9 +3,14 @@ use crate::time::{error::Error, Duration, Instant}; | |||||||||||||
|
||||||||||||||
use pin_project_lite::pin_project; | ||||||||||||||
use std::future::Future; | ||||||||||||||
use std::panic::Location; | ||||||||||||||
use std::pin::Pin; | ||||||||||||||
use std::task::{self, Poll}; | ||||||||||||||
|
||||||||||||||
cfg_trace! { | ||||||||||||||
use crate::time::driver::ClockTime; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Waits until `deadline` is reached. | ||||||||||||||
/// | ||||||||||||||
/// No work is performed while awaiting on the sleep future to complete. `Sleep` | ||||||||||||||
|
@@ -39,8 +44,15 @@ use std::task::{self, Poll}; | |||||||||||||
/// [`interval`]: crate::time::interval() | ||||||||||||||
// Alias for old name in 0.x | ||||||||||||||
#[cfg_attr(docsrs, doc(alias = "delay_until"))] | ||||||||||||||
#[cfg_attr(tokio_track_caller, track_caller)] | ||||||||||||||
pub fn sleep_until(deadline: Instant) -> Sleep { | ||||||||||||||
Sleep::new_timeout(deadline) | ||||||||||||||
#[cfg(tokio_track_caller)] | ||||||||||||||
let location = std::panic::Location::caller(); | ||||||||||||||
#[cfg(tokio_track_caller)] | ||||||||||||||
return Sleep::new_timeout(deadline, Some(location)); | ||||||||||||||
|
||||||||||||||
#[cfg(not(tokio_track_caller))] | ||||||||||||||
Sleep::new_timeout(deadline, None) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Waits until `duration` has elapsed. | ||||||||||||||
|
@@ -82,10 +94,16 @@ pub fn sleep_until(deadline: Instant) -> Sleep { | |||||||||||||
// Alias for old name in 0.x | ||||||||||||||
#[cfg_attr(docsrs, doc(alias = "delay_for"))] | ||||||||||||||
#[cfg_attr(docsrs, doc(alias = "wait"))] | ||||||||||||||
#[cfg_attr(tokio_track_caller, track_caller)] | ||||||||||||||
pub fn sleep(duration: Duration) -> Sleep { | ||||||||||||||
#[cfg(tokio_track_caller)] | ||||||||||||||
let location = Some(std::panic::Location::caller()); | ||||||||||||||
#[cfg(not(tokio_track_caller))] | ||||||||||||||
let location = None; | ||||||||||||||
|
||||||||||||||
match Instant::now().checked_add(duration) { | ||||||||||||||
Some(deadline) => sleep_until(deadline), | ||||||||||||||
None => sleep_until(Instant::far_future()), | ||||||||||||||
Some(deadline) => Sleep::new_timeout(deadline, location), | ||||||||||||||
None => Sleep::new_timeout(Instant::far_future(), location), | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
|
@@ -182,29 +200,95 @@ pin_project! { | |||||||||||||
#[derive(Debug)] | ||||||||||||||
#[must_use = "futures do nothing unless you `.await` or poll them"] | ||||||||||||||
pub struct Sleep { | ||||||||||||||
deadline: Instant, | ||||||||||||||
inner: Inner, | ||||||||||||||
|
||||||||||||||
// The link between the `Sleep` instance and the timer that drives it. | ||||||||||||||
#[pin] | ||||||||||||||
entry: TimerEntry, | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
cfg_trace! { | ||||||||||||||
#[derive(Debug)] | ||||||||||||||
struct Inner { | ||||||||||||||
deadline: Instant, | ||||||||||||||
resource_span: tracing::Span, | ||||||||||||||
async_op_span: tracing::Span, | ||||||||||||||
time_source: ClockTime, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit/tioli: it occurs to me that we don't really need to clone the time source into the tokio/tokio/src/time/driver/entry.rs Lines 401 to 406 in f3ed064
so, rather than cloning the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But that accessor is on |
||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
cfg_not_trace! { | ||||||||||||||
#[derive(Debug)] | ||||||||||||||
struct Inner { | ||||||||||||||
deadline: Instant, | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
Comment on lines
194
to
+218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thinking about this a bit more: it might be a bit more concise to move the pub struct Sleep {
deadline: Instant,
// The link between the `Sleep` instance and the timer that drives it.
#[pin]
entry: TimerEntry,
#[cfg(all(tokio_unstable, feature = "tracing"))]
trace: Trace,
}
// ...
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[derive(Debug)]
struct Trace {
resource_span: tracing::Span,
async_op_span: tracing::Span,
time_source: ClockTime,
} that way, we don't need to add a separate Of course, this only works if we use a Take it or leave it, of course! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is what I was going for originally but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
ah, got it...this is fine, then. |
||||||||||||||
|
||||||||||||||
impl Sleep { | ||||||||||||||
pub(crate) fn new_timeout(deadline: Instant) -> Sleep { | ||||||||||||||
#[allow(unused_variables)] | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this should maybe be
Suggested change
so that unused variables are only allowed when we're actually not using the |
||||||||||||||
pub(crate) fn new_timeout( | ||||||||||||||
deadline: Instant, | ||||||||||||||
location: Option<&'static Location<'static>>, | ||||||||||||||
) -> Sleep { | ||||||||||||||
let handle = Handle::current(); | ||||||||||||||
let entry = TimerEntry::new(&handle, deadline); | ||||||||||||||
|
||||||||||||||
Sleep { deadline, entry } | ||||||||||||||
#[cfg(all(tokio_unstable, feature = "tracing"))] | ||||||||||||||
let inner = { | ||||||||||||||
let time_source = handle.time_source().clone(); | ||||||||||||||
let deadline_tick = time_source.deadline_to_tick(deadline); | ||||||||||||||
let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); | ||||||||||||||
|
||||||||||||||
#[cfg(tokio_track_caller)] | ||||||||||||||
let location = location.expect("should have location if tracking caller"); | ||||||||||||||
|
||||||||||||||
#[cfg(tokio_track_caller)] | ||||||||||||||
let resource_span = tracing::trace_span!( | ||||||||||||||
hawkw marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
"runtime.resource", | ||||||||||||||
concrete_type = "Sleep", | ||||||||||||||
kind = "timer", | ||||||||||||||
location.file = location.file(), | ||||||||||||||
location.line = location.line(), | ||||||||||||||
location.column = location.column(), | ||||||||||||||
); | ||||||||||||||
|
||||||||||||||
#[cfg(not(tokio_track_caller))] | ||||||||||||||
let resource_span = | ||||||||||||||
tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer"); | ||||||||||||||
|
||||||||||||||
let async_op_span = | ||||||||||||||
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout"); | ||||||||||||||
|
||||||||||||||
tracing::trace!( | ||||||||||||||
target: "runtime::resource::state_update", | ||||||||||||||
parent: resource_span.id(), | ||||||||||||||
duration = duration, | ||||||||||||||
duration.unit = "ms", | ||||||||||||||
duration.op = "override", | ||||||||||||||
); | ||||||||||||||
|
||||||||||||||
Inner { | ||||||||||||||
deadline, | ||||||||||||||
resource_span, | ||||||||||||||
async_op_span, | ||||||||||||||
time_source, | ||||||||||||||
} | ||||||||||||||
}; | ||||||||||||||
|
||||||||||||||
#[cfg(not(all(tokio_unstable, feature = "tracing")))] | ||||||||||||||
let inner = Inner { deadline }; | ||||||||||||||
|
||||||||||||||
Sleep { inner, entry } | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
pub(crate) fn far_future() -> Sleep { | ||||||||||||||
Self::new_timeout(Instant::far_future()) | ||||||||||||||
pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { | ||||||||||||||
Self::new_timeout(Instant::far_future(), location) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Returns the instant at which the future will complete. | ||||||||||||||
pub fn deadline(&self) -> Instant { | ||||||||||||||
self.deadline | ||||||||||||||
self.inner.deadline | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Returns `true` if `Sleep` has elapsed. | ||||||||||||||
|
@@ -244,37 +328,83 @@ impl Sleep { | |||||||||||||
/// | ||||||||||||||
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut | ||||||||||||||
pub fn reset(self: Pin<&mut Self>, deadline: Instant) { | ||||||||||||||
self.reset_inner(deadline) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { | ||||||||||||||
let me = self.project(); | ||||||||||||||
me.entry.reset(deadline); | ||||||||||||||
*me.deadline = deadline; | ||||||||||||||
(*me.inner).deadline = deadline; | ||||||||||||||
|
||||||||||||||
#[cfg(all(tokio_unstable, feature = "tracing"))] | ||||||||||||||
{ | ||||||||||||||
me.inner.async_op_span = | ||||||||||||||
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout"); | ||||||||||||||
|
||||||||||||||
tracing::trace!( | ||||||||||||||
target: "runtime::resource::state_update", | ||||||||||||||
parent: me.inner.resource_span.id(), | ||||||||||||||
duration = { | ||||||||||||||
let now = me.inner.time_source.now(); | ||||||||||||||
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); | ||||||||||||||
deadline_tick.checked_sub(now).unwrap_or(0) | ||||||||||||||
}, | ||||||||||||||
duration.unit = "ms", | ||||||||||||||
duration.op = "override", | ||||||||||||||
); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { | ||||||||||||||
let me = self.project(); | ||||||||||||||
cfg_not_trace! { | ||||||||||||||
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { | ||||||||||||||
let me = self.project(); | ||||||||||||||
|
||||||||||||||
// Keep track of task budget | ||||||||||||||
let coop = ready!(crate::coop::poll_proceed(cx)); | ||||||||||||||
// Keep track of task budget | ||||||||||||||
let coop = ready!(crate::coop::poll_proceed(cx)); | ||||||||||||||
|
||||||||||||||
me.entry.poll_elapsed(cx).map(move |r| { | ||||||||||||||
coop.made_progress(); | ||||||||||||||
r | ||||||||||||||
}) | ||||||||||||||
me.entry.poll_elapsed(cx).map(move |r| { | ||||||||||||||
coop.made_progress(); | ||||||||||||||
r | ||||||||||||||
}) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
cfg_trace! { | ||||||||||||||
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { | ||||||||||||||
let me = self.project(); | ||||||||||||||
// Keep track of task budget | ||||||||||||||
let coop = ready!(trace_poll_op!( | ||||||||||||||
"poll_elapsed", | ||||||||||||||
crate::coop::poll_proceed(cx), | ||||||||||||||
me.inner.resource_span.id(), | ||||||||||||||
)); | ||||||||||||||
|
||||||||||||||
let result = me.entry.poll_elapsed(cx).map(move |r| { | ||||||||||||||
coop.made_progress(); | ||||||||||||||
r | ||||||||||||||
}); | ||||||||||||||
|
||||||||||||||
trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id()) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
impl Future for Sleep { | ||||||||||||||
type Output = (); | ||||||||||||||
|
||||||||||||||
// `poll_elapsed` can return an error in two cases: | ||||||||||||||
// | ||||||||||||||
// - AtCapacity: this is a pathological case where far too many | ||||||||||||||
// sleep instances have been scheduled. | ||||||||||||||
// - Shutdown: No timer has been setup, which is a mis-use error. | ||||||||||||||
// | ||||||||||||||
// Both cases are extremely rare, and pretty accurately fit into | ||||||||||||||
// "logic errors", so we just panic in this case. A user couldn't | ||||||||||||||
// really do much better if we passed the error onwards. | ||||||||||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||||||||||
// `poll_elapsed` can return an error in two cases: | ||||||||||||||
// | ||||||||||||||
// - AtCapacity: this is a pathological case where far too many | ||||||||||||||
// sleep instances have been scheduled. | ||||||||||||||
// - Shutdown: No timer has been setup, which is a mis-use error. | ||||||||||||||
// | ||||||||||||||
// Both cases are extremely rare, and pretty accurately fit into | ||||||||||||||
// "logic errors", so we just panic in this case. A user couldn't | ||||||||||||||
// really do much better if we passed the error onwards. | ||||||||||||||
#[cfg(all(tokio_unstable, feature = "tracing"))] | ||||||||||||||
let _span = self.inner.async_op_span.clone().entered(); | ||||||||||||||
|
||||||||||||||
match ready!(self.as_mut().poll_elapsed(cx)) { | ||||||||||||||
Ok(()) => Poll::Ready(()), | ||||||||||||||
Err(e) => panic!("timer error: {}", e), | ||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,9 @@ use std::future::Future; | |||||||||||
use std::pin::Pin; | ||||||||||||
use std::task::{self, Poll}; | ||||||||||||
|
||||||||||||
#[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))] | ||||||||||||
use std::panic::Location; | ||||||||||||
|
||||||||||||
/// Require a `Future` to complete before the specified duration has elapsed. | ||||||||||||
/// | ||||||||||||
/// If the future completes before the duration has elapsed, then the completed | ||||||||||||
|
@@ -45,14 +48,20 @@ use std::task::{self, Poll}; | |||||||||||
/// } | ||||||||||||
/// # } | ||||||||||||
/// ``` | ||||||||||||
#[cfg_attr(tokio_track_caller, track_caller)] | ||||||||||||
pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> | ||||||||||||
where | ||||||||||||
T: Future, | ||||||||||||
{ | ||||||||||||
#[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))] | ||||||||||||
let location = Some(Location::caller()); | ||||||||||||
#[cfg(not(all(tokio_track_caller, tokio_unstable, feature = "tracing")))] | ||||||||||||
let location = None; | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we made a wrapper function for getting the callsite location, as i suggested above, this could also just be
Suggested change
|
||||||||||||
|
||||||||||||
let deadline = Instant::now().checked_add(duration); | ||||||||||||
let delay = match deadline { | ||||||||||||
Some(deadline) => Sleep::new_timeout(deadline), | ||||||||||||
None => Sleep::far_future(), | ||||||||||||
Some(deadline) => Sleep::new_timeout(deadline, location), | ||||||||||||
None => Sleep::far_future(location), | ||||||||||||
}; | ||||||||||||
Timeout::new_with_delay(future, delay) | ||||||||||||
} | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, take it or leave it: i wonder if it's worthwhile to just stick this behind a function, so we don't have to have so many
cfg(...)
attributes in function bodies? maybe inutil/trace.rs
, we could add aand use this here and elsewhere. then, this could just be