Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: instrument more resources #4302

Merged
merged 9 commits into from Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/macros/cfg.rs
Expand Up @@ -368,7 +368,7 @@ macro_rules! cfg_trace {
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
$item
)*
}
};
}

macro_rules! cfg_not_trace {
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/macros/trace.rs
@@ -1,24 +1,23 @@
cfg_trace! {
macro_rules! trace_op {
($name:literal, $readiness:literal, $parent:expr) => {
($name:expr, $readiness:literal) => {
tracing::trace!(
target: "runtime::resource::poll_op",
parent: $parent,
op_name = $name,
is_ready = $readiness
);
}
}

macro_rules! trace_poll_op {
($name:literal, $poll:expr, $parent:expr $(,)*) => {
($name:expr, $poll:expr $(,)*) => {
match $poll {
std::task::Poll::Ready(t) => {
trace_op!($name, true, $parent);
trace_op!($name, true);
std::task::Poll::Ready(t)
}
std::task::Poll::Pending => {
trace_op!($name, false, $parent);
trace_op!($name, false);
return std::task::Poll::Pending;
}
}
Expand Down
63 changes: 63 additions & 0 deletions tokio/src/sync/barrier.rs
@@ -1,5 +1,7 @@
use crate::loom::sync::Mutex;
use crate::sync::watch;
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;

/// A barrier enables multiple tasks to synchronize the beginning of some computation.
///
Expand Down Expand Up @@ -41,6 +43,8 @@ pub struct Barrier {
state: Mutex<BarrierState>,
wait: watch::Receiver<usize>,
n: usize,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
}

#[derive(Debug)]
Expand All @@ -55,6 +59,7 @@ impl Barrier {
///
/// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all
/// tasks at once when the `n`th task calls `wait`.
#[track_caller]
pub fn new(mut n: usize) -> Barrier {
let (waker, wait) = crate::sync::watch::channel(0);

Expand All @@ -65,6 +70,32 @@ impl Barrier {
n = 1;
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Barrier",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
size = n,
);

tracing::trace!(
target: "runtime::resource::state_update",
arrived = 0,
)
});
resource_span
};

Barrier {
state: Mutex::new(BarrierState {
waker,
Expand All @@ -73,6 +104,8 @@ impl Barrier {
}),
n,
wait,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: resource_span,
}
}

Expand All @@ -85,6 +118,20 @@ impl Barrier {
/// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks
/// will receive a result that will return `false` from `is_leader`.
pub async fn wait(&self) -> BarrierWaitResult {
#[cfg(all(tokio_unstable, feature = "tracing"))]
return trace::async_op(
|| self.wait_internal(),
self.resource_span.clone(),
"Barrier::wait",
"poll",
false,
)
.await;

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
return self.wait_internal().await;
}
async fn wait_internal(&self) -> BarrierWaitResult {
// NOTE: we are taking a _synchronous_ lock here.
// It is okay to do so because the critical section is fast and never yields, so it cannot
// deadlock even if another future is concurrently holding the lock.
Expand All @@ -96,7 +143,23 @@ impl Barrier {
let mut state = self.state.lock();
let generation = state.generation;
state.arrived += 1;
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::state_update",
arrived = 1,
arrived.op = "add",
);
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::async_op::state_update",
arrived = true,
);
if state.arrived == self.n {
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing::trace!(
target: "runtime::resource::async_op::state_update",
is_leader = true,
);
// we are the leader for this generation
// wake everyone, increment the generation, and return
state
Expand Down