Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: instrument time::Sleep #4072

Merged
merged 13 commits into from Sep 22, 2021
195 changes: 164 additions & 31 deletions tokio/src/time/driver/sleep.rs
Expand Up @@ -6,6 +6,10 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};

cfg_trace! {
use crate::time::driver::ClockTime;
}

/// Waits until `deadline` is reached.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
Expand Down Expand Up @@ -89,6 +93,23 @@ pub fn sleep(duration: Duration) -> Sleep {
}
}

cfg_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
resource_span: tracing::Span,
async_op_span: tracing::Span,
time_source: ClockTime,
}
}

cfg_not_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor style nit: in general, we prefer to put private inner types after the public API types that contain them...so these should go after the definition of Sleep.

not a big deal though.


pin_project! {
/// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
///
Expand Down Expand Up @@ -182,7 +203,7 @@ pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
deadline: Instant,
inner: Inner,

// The link between the `Sleep` instance and the timer that drives it.
#[pin]
Expand All @@ -191,11 +212,43 @@ pin_project! {
}

impl Sleep {
pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);
cfg_not_trace! {
pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);
Sleep { inner: Inner {deadline}, entry }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it looks like rustfmt is not formatting these because they're inside the cfg_not_trace! macro? i would expect this to be formatted like

Suggested change
Sleep { inner: Inner {deadline}, entry }
Sleep {
inner: Inner { deadline },
entry,
}

}
}

cfg_trace! {
pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);

let time_source = handle.time_source().clone();
let duration = time_source.deadline_to_tick(deadline) - time_source.now();
let resource_span = tracing::trace_span!(
hawkw marked this conversation as resolved.
Show resolved Hide resolved
"runtime.resource",
concrete_type = "Sleep",
kind = "timer");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like rustfmt didn't format this:

Suggested change
kind = "timer");
kind = "timer",
);


let async_op_span = tracing::trace_span!(
"runtime.resource.async_op",
source = "Sleep::new_timeout"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, i'm not totally sure how useful this is going to be. the Sleep resource can be created by the user-facing sleep and sleep_until functions, as well as internally by the Interval type created by interval and interval_at. I would kind of expect the async-op span to be distinguish between those different places where a Sleep might be created, rather than always having the source of "Sleep::new_timeout"...which isn't even a public API function. Since users aren't calling Sleep::new_timeout directly, it's not going to be particularly useful for them to see it in their diagnostics...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, creating the async_op span here means that the async op will "start" as soon as the Sleep is created...but I'm not really sure if that's what we want. I think the async op's duration should probably start the first time the sleep's poll_elapsed is called.

Actually, what I think we really want is to have one async-op span that corresponds to the Future impl for Sleep, and is created the first time the future is polled, and another one that corresponds to Interval::tick/``Interval::poll_tick. We could do something like this by using [Span::none`](https://docs.rs/tracing/0.1.26/tracing/span/struct.Span.html#method.none):

When constructing the Sleep, we would initialize the async_op_span field like this

let inner = Inner {
    deadline,
    resource_span,
    async_op_span: tracing::Span::none(),
    time_source,
};

and then in the Future impl for sleep, we would create the span if it is None:

impl Future for Sleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        // eliding some `Pin`-related noise that i'm sure would be necessary here...
        if self.inner.async_op_span.is_none() {
            self.inner.async_op_span = tracing::trace_span!(
                "runtime.resource.async_op",
                source = "Sleep",
            );
        }
        let _span = self.inner.async_op_span.clone().entered();
        // ...
    }
}

Interval could manage its own async op span (which I think we'd probably want to re-create every time a new tick starts?).

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw I think that makes sense. My initial thought was that the async op span represents the life of the future that represents the async operation. So really from the moment of instantiating the future to the time it is dropped. This is also why there is this little comment in the proto code: https://github.com/tokio-rs/console/blob/main/proto/async_ops.proto#L52. Namely, until it is polled, we do not really know which task or which resource this op is associated with. I think what you are describing makes sense, especially the suggestion around the "source" field. One reason I went with creating the span as part of the future instantiation is that I would assume that ideally we would like to be able to observe a situation where a bunch of futures are created by are never polled? WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think the issue is that a Sleep is not necessarily a single Future. It might also be constructed internally as part of an Interval, and I think it's important that every time the interval is ticked is considered a separate async op.

I could go either way on when the async op span should begin, but I think distinguishing between awaiting a Sleep as a future, and ticking an Interval is important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have it reset the span every time Sleep::reset is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have it reset the span every time Sleep::reset is called?

Yeah, that plus maybe passing in a string constant for the "source" field to distinguish between spans created via time::sleep and time::interval would probably be sufficient!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw All of this makes me wonder whether the source field is useful at all. Passing string constants around seems a bit brittle. I wonder if there is a more elegant way to represent the fact that the sleep resource is being used by the interval? This is not the only instance of the pattern. For example RwLock internally uses a Semaphore. Should we just get rid of the "source" field an introduce a parent-child relationship between resources where a resource span can be a child of another resource span. Or event rely on the stack of entered spans to determine that and instead of relying on a field rely on the backtrace of entered spans? This way we will be able to associate the fact that this async op (and all others) that are happening on the Sleep future are being triggered by the parent Interval resource. Does that make sense? Does it sound better? The fact that we need to reset the async op span when resetting the timer is still true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that in the eventual future, establishing the relationships between higher and lower-level async operations using a span hierarchy definitely does make sense. However, it's also additional work we'd have to do in the subscriber. I think that having the source field is probably fine though, especially since adding code to the console subscriber to infer these relationships based on inspecting the span context would require additional changes.

In general, I think it's fine to add as many fields to things as we like, and not worry too much about changing them around later...as long as they're just displayed in the "fields" section, and don't have other special meanings to the console subscriber. If we do add special meanings to fields beyond just displaying them as arbitrary k/v pairs, then we need to be a little more cautious about removing/changing them, but I think it's fine to keep the source field and continue iterating on it in further branches, especially if it lets us move forward with this and come back to it later...

);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
source = "Sleep::new_timeout"
);
source = "Sleep::new_timeout",
);



let span_guard = resource_span.enter();
tracing::trace!(
target: "tokio::resource::state_update",
hawkw marked this conversation as resolved.
Show resolved Hide resolved
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
drop(span_guard);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rather than entering and then immediately exiting the span, we can also write

Suggested change
let span_guard = resource_span.enter();
tracing::trace!(
target: "tokio::resource::state_update",
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
drop(span_guard);
tracing::trace!(
parent: resource_span.id(),
target: "tokio::resource::state_update",
duration = duration,
duration.unit = "ms",
duration.op = "override",
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the console we are using ctx.current_span().id() to get the resource span, expecting that this span is always the current when such an event is emitted. Does setting the parent explicitly just set the parent or it also enters the span. And more importantly, which approach do we want to use, looking at the current span or looking at the explicit parent of the event?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does setting the parent explicitly just set the parent or it also enters the span.

Setting the parent only explicitly sets the parent, so we will need to change the console code to use Event::parent or (preferably) Context::event_span instead.

And more importantly, which approach do we want to use, looking at the current span or looking at the explicit parent of the event?

It's more correct to look at the explicit parent of the event, and fall back to the current span if there is no explicit parent.


Sleep { deadline, entry }
Sleep { inner: Inner {deadline, resource_span, async_op_span, time_source }, entry }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sleep { inner: Inner {deadline, resource_span, async_op_span, time_source }, entry }
Sleep {
inner: Inner {
deadline,
resource_span,
async_op_span,
time_source,
},
entry,
}

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take it or leave it: it seems like potentially, we could combine these two functions into one function, where only the construction of inner is guarded by the tracing cfgs. That way, the shared code is not duplicated:

    pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
        let handle = Handle::current();
        let entry = TimerEntry::new(&handle, deadline);

        #[cfg(all(tokio_unstable, feature = "tracing"))]
        let inner = {
            let time_source =  handle.time_source().clone();
            let duration = time_source.deadline_to_tick(deadline) - time_source.now();
            let resource_span = tracing::trace_span!(
                "runtime.resource",
                concrete_type = "Sleep",
                kind = "timer",
            );

            let async_op_span = tracing::trace_span!(
                "runtime.resource.async_op",
                source = "Sleep::new_timeout",
            );

            let inner = Inner {
                deadline,
                resource_span,
                async_op_span,
                time_source,
            };

            let _span_guard = inner.resource_span.enter();
            tracing::trace!(
                target: "tokio::resource::state_update",
                duration = duration,
                duration.unit = "ms",
                duration.op = "override",
            );
            inner
        };

        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let inner = Inner { deadline };


        Sleep { inner, entry }
    }

}

pub(crate) fn far_future() -> Sleep {
Expand All @@ -204,7 +257,7 @@ impl Sleep {

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
self.inner.deadline
}

/// Returns `true` if `Sleep` has elapsed.
Expand All @@ -220,7 +273,7 @@ impl Sleep {
/// future completes without having to create new associated state.
///
/// This function can be called both before and after the future has
/// completed.
/// completed.Event
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// completed.Event
/// completed.

///
/// To call this method, you will usually combine the call with
/// [`Pin::as_mut`], which lets you call the method without consuming the
Expand All @@ -244,40 +297,120 @@ impl Sleep {
///
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
*me.deadline = deadline;
self.reset_inner(deadline)
}

cfg_trace! {
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();

let now = me.inner.time_source.now();
let duration = me.inner.time_source.deadline_to_tick(deadline) - now;
let _span_guard = me.inner.resource_span.enter();
tracing::trace!(
target: "tokio::resource::state_update",
hawkw marked this conversation as resolved.
Show resolved Hide resolved
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
me.entry.reset(deadline);
(*me.inner).deadline = deadline;
}
}

fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
cfg_not_trace! {
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
(*me.inner).deadline = deadline;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly to my comments above, i think it might be better to have only the tracing-related code be cfg flagged, so that we're not duplicating the code for actually resetting the deadline. something like this:

    fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
        let me = self.project();
        me.entry.reset(deadline);
        (*me.inner).deadline = deadline;

        #[cfg(all(tokio_unstable, feature = "tracing"))]
        {
            let now = me.inner.time_source.now();
            let duration = me.inner.time_source.deadline_to_tick(deadline) - now;
            tracing::trace!(
                parent: me.inner.resource_span.id(),
                target: "tokio::resource::state_update",
                duration = duration,
                duration.unit = "ms",
                duration.op = "override",
            );
        }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is a problem with the trace! macro when setting both the parent and the target at the same time:

It results in:

error: expected type, found `"runtime::resource::state_update"`
   --> tokio/src/time/driver/sleep.rs:319:25
    |
319 |                 target: "runtime::resource::state_update",
    |                       - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type
    |                       |
    |                       tried to parse a type due to this type ascription
    |
    = note: `#![feature(type_ascription)]` lets you annotate an expression with a type: `<expr>: <type>`
    = note: see issue #23416 <https://github.com/rust-lang/rust/issues/23416> for more information

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i think i got the order backwards, it should be target: "...", parent: ..., rather than parent: ..., target: "...". because the order is wrong, it doesn't match the expected macro match arm, and the compiler tries to parse target as a variable name with a type ascription.

swapping the order should fix this, my bad!


cfg_not_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
}
}

cfg_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
macro_rules! trace_op {
($readiness:literal) => {
tracing::trace!(
target: "tokio::resource::poll_op",
hawkw marked this conversation as resolved.
Show resolved Hide resolved
op_name = "poll_elapsed",
readiness = $readiness
);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like something that might be generally useful for implementing instrumented resources --- perhaps it could also handle matching the readiness. Something like this:

            macro_rules! trace_poll_op {
                ($name:literal, $poll:expr) => {
                    let poll = $poll;
                    let readiness = match &poll {
                        Poll::Ready(_) => "ready",
                        Poll::Pending => "pending",
                    };
                    tracing::trace!(
                        target: "tokio::resource::poll_op",
                        op_name = $name,
                        readiness,
                    );
                    poll
                }
            }

and then we could write code like this:

let coop = ready!(trace_poll_op!("poll_elapsed", crate::coop::poll_proceed(cx)));

let result =  me.entry.poll_elapsed(cx).map(move |r| {
    coop.made_progress();
    r
});

trace_poll_op!("poll_elapsed", result)

the trace_poll_op macro could be defined someplace where it can also eventually be used to instrument other resources.

but, of course, this can be done in a follow-up PR, when we add instrumentation to other resources.


let me = self.project();
let _span_guard = me.inner.resource_span.enter();

// Keep track of task budget
let coop = match crate::coop::poll_proceed(cx) {
Poll::Pending => {
trace_op!("pending");
return Poll::Pending;
},
Poll::Ready(coop) => coop
};


let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
});
match result {
Poll::Ready(result) => {
trace_op!("ready");
Poll::Ready(result)
},
Poll::Pending => {
trace_op!("pending");
Poll::Pending
}
}
}
}
}

impl Future for Sleep {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
cfg_not_trace! {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}

cfg_trace! {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let _span = self.inner.async_op_span.clone().entered();
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified to just:

Suggested change
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
cfg_not_trace! {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}
cfg_trace! {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let _span = self.inner.async_op_span.clone().entered();
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathological case where far too many
// sleep instances have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _span = self.inner.async_op_span.clone().entered();
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could be done by putting the cfg inside the function. If the macro doesn't support this, try changing it to this:

macro_rules! cfg_trace {
    ($($stmt:stmt;)*) => {
        $(
        #[cfg(all(tokio_unstable, feature = "tracing"))]
        $stmt;
        )*
    };
    ($($item:item)*) => {
        $(
            #[cfg(all(tokio_unstable, feature = "tracing"))]
            #[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
            $item
        )*
    };
}

}