From 4961567f4b1d2aba0a967820b7ede60cbf5d4081 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 23 Nov 2021 12:45:57 +0000 Subject: [PATCH 1/9] tracing: add more instrumentation Signed-off-by: Zahari Dichev --- tokio/src/macros/cfg.rs | 14 +- tokio/src/macros/trace.rs | 9 +- tokio/src/sync/batch_semaphore.rs | 213 ++++++++++++++++++++++++++---- tokio/src/sync/mutex.rs | 187 ++++++++++++++++++++++++-- tokio/src/sync/semaphore.rs | 106 +++++++++++++-- tokio/src/time/driver/sleep.rs | 74 +++++++---- tokio/src/time/interval.rs | 74 ++++++++++- tokio/src/util/trace.rs | 57 ++++++++ 8 files changed, 654 insertions(+), 80 deletions(-) diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3afc0402374..9ebe0077846 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -362,16 +362,28 @@ macro_rules! cfg_not_time { } macro_rules! cfg_trace { + ($($stmt:stmt;)*) => { + $( + #[cfg(all(tokio_unstable, feature = "tracing"))] + $stmt; + )* + }; ($($item:item)*) => { $( #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] $item )* - } + }; } macro_rules! cfg_not_trace { + ($($stmt:stmt;)*) => { + $( + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + $stmt; + )* + }; ($($item:item)*) => { $( #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] diff --git a/tokio/src/macros/trace.rs b/tokio/src/macros/trace.rs index 31dde2f255a..80a257e1899 100644 --- a/tokio/src/macros/trace.rs +++ b/tokio/src/macros/trace.rs @@ -1,9 +1,8 @@ cfg_trace! { macro_rules! trace_op { - ($name:literal, $readiness:literal, $parent:expr) => { + ($name:expr, $readiness:literal) => { tracing::trace!( target: "runtime::resource::poll_op", - parent: $parent, op_name = $name, is_ready = $readiness ); @@ -11,14 +10,14 @@ cfg_trace! { } macro_rules! trace_poll_op { - ($name:literal, $poll:expr, $parent:expr $(,)*) => { + ($name:expr, $poll:expr $(,)*) => { match $poll { std::task::Poll::Ready(t) => { - trace_op!($name, true, $parent); + trace_op!($name, true); std::task::Poll::Ready(t) } std::task::Poll::Pending => { - trace_op!($name, false, $parent); + trace_op!($name, false); return std::task::Poll::Pending; } } diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index b5c39d2a05d..792d952066c 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -19,6 +19,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use crate::util::WakeList; use std::future::Future; @@ -35,6 +37,8 @@ pub(crate) struct Semaphore { waiters: Mutex, /// The current number of available permits in the semaphore. permits: AtomicUsize, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } struct Waitlist { @@ -101,6 +105,9 @@ struct Waiter { /// use `UnsafeCell` internally. pointers: linked_list::Pointers, + #[cfg(all(tokio_unstable, feature = "tracing"))] + ctx: trace::AsyncOpTracingCtx, + /// Should not be `Unpin`. _p: PhantomPinned, } @@ -129,12 +136,40 @@ impl Semaphore { "a semaphore may not have more than MAX_PERMITS permits ({})", Self::MAX_PERMITS ); - Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), + + cfg_trace! { + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + is_internal = true + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = permits, + permits.op = "override", + ) + }); + + return Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + resource_span, + }; + } + cfg_not_trace! { + return Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + }; } } @@ -150,12 +185,25 @@ impl Semaphore { // currently we just clamp the permit count when it exceeds the max permits &= Self::MAX_PERMITS; - Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::const_new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), + cfg_trace! { + return Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::const_new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + resource_span: tracing::Span::none(), + }; + } + + cfg_not_trace! { + return Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::const_new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + }; } } @@ -224,7 +272,10 @@ impl Semaphore { let next = curr - num_permits; match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { - Ok(_) => return Ok(()), + Ok(_) => { + // TODO: Instrument once issue has been solved} + return Ok(()); + } Err(actual) => curr = actual, } } @@ -283,6 +334,17 @@ impl Semaphore { rem, Self::MAX_PERMITS ); + + // add remaining permits back + cfg_trace! { + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = rem, + permits.op = "add", + ) + }); + } rem = 0; } @@ -347,6 +409,20 @@ impl Semaphore { acquired += acq; if remaining == 0 { if !queued { + cfg_trace! { + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ); + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = acquired, + permits.op = "add", + ) + }); + } return Ready(Ok(())); } else if lock.is_none() { break self.waiters.lock(); @@ -362,6 +438,16 @@ impl Semaphore { return Ready(Err(AcquireError::closed())); } + cfg_trace! { + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ) + }); + } + if node.assign_permits(&mut acquired) { self.add_permits_locked(acquired, waiters); return Ready(Ok(())); @@ -406,12 +492,26 @@ impl fmt::Debug for Semaphore { } impl Waiter { - fn new(num_permits: u32) -> Self { - Waiter { - waker: UnsafeCell::new(None), - state: AtomicUsize::new(num_permits as usize), - pointers: linked_list::Pointers::new(), - _p: PhantomPinned, + cfg_not_trace! { + fn new(num_permits: u32) -> Self { + Waiter { + waker: UnsafeCell::new(None), + state: AtomicUsize::new(num_permits as usize), + pointers: linked_list::Pointers::new(), + _p: PhantomPinned, + } + } + } + + cfg_trace! { + fn new(num_permits: u32, ctx: trace::AsyncOpTracingCtx) -> Self { + Waiter { + waker: UnsafeCell::new(None), + state: AtomicUsize::new(num_permits as usize), + pointers: linked_list::Pointers::new(), + ctx, + _p: PhantomPinned, + } } } @@ -426,6 +526,14 @@ impl Waiter { match self.state.compare_exchange(curr, next, AcqRel, Acquire) { Ok(_) => { *n -= assign; + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.ctx.async_op_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = assign, + permits.op = "add", + ); + }); return next == 0; } Err(actual) => curr = actual, @@ -438,12 +546,26 @@ impl Future for Acquire<'_> { type Output = Result<(), AcquireError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // First, ensure the current task has enough budget to proceed. - let coop = ready!(crate::coop::poll_proceed(cx)); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _resource_span = self.node.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _async_op_span = self.node.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered(); let (node, semaphore, needed, queued) = self.project(); - match semaphore.poll_acquire(cx, needed, node, *queued) { + // First, ensure the current task has enough budget to proceed. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_acquire", + crate::coop::poll_proceed(cx), + )); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); + + let result = match semaphore.poll_acquire(cx, needed, node, *queued) { Pending => { *queued = true; Pending @@ -454,18 +576,59 @@ impl Future for Acquire<'_> { *queued = false; Ready(Ok(())) } - } + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_acquire", result); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return result; } } impl<'a> Acquire<'a> { fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { - Self { + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { node: Waiter::new(num_permits), semaphore, num_permits, queued: false, - } + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return semaphore.resource_span.in_scope(|| { + let async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new"); + let async_op_poll_span = async_op_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_requested = num_permits, + permits.op = "override", + ); + + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = 0 as usize, + permits.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op.poll") + }); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: semaphore.resource_span.clone(), + }; + + Self { + node: Waiter::new(num_permits, ctx), + semaphore, + num_permits, + queued: false, + } + }); } fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 4d9f9886d76..8c099bd8966 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -1,6 +1,8 @@ #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] use crate::sync::batch_semaphore as semaphore; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::cell::UnsafeCell; use std::error::Error; @@ -124,6 +126,8 @@ use std::{fmt, marker, mem}; /// [`Send`]: trait@std::marker::Send /// [`lock`]: method@Mutex::lock pub struct Mutex { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, s: semaphore::Semaphore, c: UnsafeCell, } @@ -138,6 +142,8 @@ pub struct Mutex { /// The lock is automatically released whenever the guard is dropped, at which /// point `lock` will succeed yet again. pub struct MutexGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: &'a Mutex, } @@ -157,6 +163,8 @@ pub struct MutexGuard<'a, T: ?Sized> { /// /// [`Arc`]: std::sync::Arc pub struct OwnedMutexGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: Arc>, } @@ -242,14 +250,52 @@ impl Mutex { /// /// let lock = Mutex::new(5); /// ``` + #[cfg_attr(tokio_track_caller, track_caller)] pub fn new(t: T) -> Self where T: Sized, { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let mutex = { + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + + #[cfg(tokio_track_caller)] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Mutex", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + #[cfg(not(tokio_track_caller))] + let resource_span = + tracing::trace_span!("runtime.resource", concrete_type = "Mutex", kind = "Sync"); + + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + semaphore::Semaphore::new(1) + }); + + Self { + c: UnsafeCell::new(t), + s, + resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let mutex = Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::new(1), - } + }; + + mutex } /// Creates a new lock in an unlocked state ready for use. @@ -267,10 +313,18 @@ impl Mutex { where T: Sized, { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::const_new(1), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::const_new(1), + }; } /// Locks this mutex, causing the current task to yield until the lock has @@ -297,8 +351,37 @@ impl Mutex { /// } /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { - self.acquire().await; - MutexGuard { lock: self } + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = { + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock", + "poll", + false, + ) + .await; + + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + MutexGuard { + lock: self, + resource_span: self.resource_span.clone(), + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let guard = { + self.acquire().await; + MutexGuard { lock: self } + }; + + guard } /// Blocking lock this mutex. When the lock has been acquired, function returns a @@ -368,8 +451,38 @@ impl Mutex { /// /// [`Arc`]: std::sync::Arc pub async fn lock_owned(self: Arc) -> OwnedMutexGuard { - self.acquire().await; - OwnedMutexGuard { lock: self } + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = { + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock_owned", + "poll", + false, + ) + .await; + + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + let resource_span = self.resource_span.clone(); + OwnedMutexGuard { + lock: self, + resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let guard = { + self.acquire().await; + OwnedMutexGuard { lock: self } + }; + + guard } async fn acquire(&self) { @@ -399,7 +512,26 @@ impl Mutex { /// ``` pub fn try_lock(&self) -> Result, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(MutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = { + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + Ok(MutexGuard { + lock: self, + resource_span: self.resource_span.clone(), + }) + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let guard = Ok(MutexGuard { lock: self }); + + guard + } Err(_) => Err(TryLockError(())), } } @@ -454,7 +586,26 @@ impl Mutex { /// # } pub fn try_lock_owned(self: Arc) -> Result, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(OwnedMutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = { + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + let resource_span = self.resource_span.clone(); + Ok(OwnedMutexGuard { + lock: self, + resource_span, + }) + }; + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let guard = Ok(OwnedMutexGuard { lock: self }); + + guard + } Err(_) => Err(TryLockError(())), } } @@ -637,7 +788,14 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - self.lock.s.release(1) + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); + self.lock.s.release(1); } } @@ -699,6 +857,13 @@ impl OwnedMutexGuard { impl Drop for OwnedMutexGuard { fn drop(&mut self) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); self.lock.s.release(1) } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 839b523c4ce..c33dc15d330 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,5 +1,7 @@ use super::batch_semaphore as ll; // low level implementation use super::{AcquireError, TryAcquireError}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::sync::Arc; /// Counting semaphore performing asynchronous permit acquisition. @@ -77,6 +79,8 @@ use std::sync::Arc; pub struct Semaphore { /// The low level semaphore ll_sem: ll::Semaphore, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } /// A permit from the semaphore. @@ -120,10 +124,45 @@ fn bounds() { impl Semaphore { /// Creates a new semaphore with the initial number of permits. + #[cfg_attr(tokio_track_caller, track_caller)] pub fn new(permits: usize) -> Self { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let sem = { + #[cfg(tokio_track_caller)] + let location = std::panic::Location::caller(); + + #[cfg(tokio_track_caller)] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + inherits_child_attrs = true, + ); + + #[cfg(not(tokio_track_caller))] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + inherits_child_attrs = true + ); + + let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); + Self { + ll_sem, + resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let sem = Self { ll_sem: ll::Semaphore::new(permits), - } + }; + + sem } /// Creates a new semaphore with the initial number of permits. @@ -139,9 +178,16 @@ impl Semaphore { #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new(permits: usize) -> Self { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Self { ll_sem: ll::Semaphore::const_new(permits), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { + ll_sem: ll::Semaphore::const_new(permits), + }; } /// Returns the current number of available permits. @@ -191,7 +237,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire(&self) -> Result, AcquireError> { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(SemaphorePermit { sem: self, permits: 1, @@ -227,7 +284,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { - self.ll_sem.acquire(n).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(n); + + inner.await?; Ok(SemaphorePermit { sem: self, permits: n, @@ -350,7 +418,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub async fn acquire_owned(self: Arc) -> Result { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: 1, @@ -403,7 +482,18 @@ impl Semaphore { self: Arc, n: u32, ) -> Result { - self.ll_sem.acquire(n).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(n); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: n, diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index d10639d191c..3a78cd63e2c 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -238,8 +238,7 @@ cfg_trace! { #[derive(Debug)] struct Inner { deadline: Instant, - resource_span: tracing::Span, - async_op_span: tracing::Span, + ctx: trace::AsyncOpTracingCtx, time_source: ClockTime, } } @@ -252,7 +251,10 @@ cfg_not_trace! { } impl Sleep { - #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[cfg_attr( + not(all(tokio_unstable, tokio_track_caller, feature = "tracing")), + allow(unused_variables) + )] pub(crate) fn new_timeout( deadline: Instant, location: Option<&'static Location<'static>>, @@ -266,8 +268,10 @@ impl Sleep { let deadline_tick = time_source.deadline_to_tick(deadline); let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); + #[cfg(tokio_track_caller)] let location = location.expect("should have location if tracking caller"); + #[cfg(tokio_track_caller)] let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Sleep", @@ -277,21 +281,33 @@ impl Sleep { loc.col = location.column(), ); - let async_op_span = - tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout"); + #[cfg(not(tokio_track_caller))] + let resource_span = + tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer"); - tracing::trace!( - target: "runtime::resource::state_update", - parent: resource_span.id(), - duration = duration, - duration.unit = "ms", - duration.op = "override", - ); + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; Inner { deadline, - resource_span, - async_op_span, + ctx, time_source, } }; @@ -358,17 +374,23 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] { - me.inner.async_op_span = + let _resource_enter = me.inner.ctx.resource_span.enter(); + me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + let _async_op_enter = me.inner.ctx.async_op_span.enter(); + + me.inner.ctx.async_op_poll_span = + tracing::trace_span!("runtime.resource.async_op.poll"); + + let duration = { + let now = me.inner.time_source.now(); + let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + deadline_tick.checked_sub(now).unwrap_or(0) + }; tracing::trace!( target: "runtime::resource::state_update", - parent: me.inner.resource_span.id(), - duration = { - let now = me.inner.time_source.now(); - let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); - deadline_tick.checked_sub(now).unwrap_or(0) - }, + duration = duration, duration.unit = "ms", duration.op = "override", ); @@ -396,7 +418,6 @@ impl Sleep { let coop = ready!(trace_poll_op!( "poll_elapsed", crate::coop::poll_proceed(cx), - me.inner.resource_span.id(), )); let result = me.entry.poll_elapsed(cx).map(move |r| { @@ -404,7 +425,7 @@ impl Sleep { r }); - trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id()) + trace_poll_op!("poll_elapsed", result) } } } @@ -423,8 +444,11 @@ impl Future for Sleep { // really do much better if we passed the error onwards. fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { #[cfg(all(tokio_unstable, feature = "tracing"))] - let _span = self.inner.async_op_span.clone().entered(); - + let _res_span = self.inner.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.inner.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); match ready!(self.as_mut().poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 6a89943741e..59df3ba4668 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -1,6 +1,8 @@ use crate::future::poll_fn; use crate::time::{sleep_until, Duration, Instant, Sleep}; +use crate::util::trace; +use std::panic::Location; use std::pin::Pin; use std::task::{Context, Poll}; use std::{convert::TryInto, future::Future}; @@ -68,10 +70,10 @@ use std::{convert::TryInto, future::Future}; /// /// [`sleep`]: crate::time::sleep() /// [`.tick().await`]: Interval::tick +#[cfg_attr(tokio_track_caller, track_caller)] pub fn interval(period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); - - interval_at(Instant::now(), period) + internal_interval_at(Instant::now(), period, trace::caller_location()) } /// Creates new [`Interval`] that yields with interval of `period` with the @@ -103,14 +105,60 @@ pub fn interval(period: Duration) -> Interval { /// // approximately 70ms have elapsed. /// } /// ``` +#[cfg_attr(tokio_track_caller, track_caller)] pub fn interval_at(start: Instant, period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + internal_interval_at(start, period, trace::caller_location()) +} + +#[cfg_attr( + not(all(tokio_unstable, tokio_track_caller, feature = "tracing")), + allow(unused_variables) +)] +fn internal_interval_at( + start: Instant, + period: Duration, + location: Option<&'static Location<'static>>, +) -> Interval { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let interval = { + #[cfg(tokio_track_caller)] + let location = location.expect("should have location if tracking caller"); + + #[cfg(tokio_track_caller)] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Interval", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + #[cfg(not(tokio_track_caller))] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Interval", + kind = "timer" + ); - Interval { + let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); + Interval { + delay, + period, + missed_tick_behavior: Default::default(), + resource_span, + } + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let interval = Interval { delay: Box::pin(sleep_until(start)), period, missed_tick_behavior: Default::default(), - } + }; + + interval } /// Defines the behavior of an [`Interval`] when it misses a tick. @@ -362,6 +410,9 @@ pub struct Interval { /// The strategy `Interval` should use when a tick is missed. missed_tick_behavior: MissedTickBehavior, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } impl Interval { @@ -391,7 +442,20 @@ impl Interval { /// } /// ``` pub async fn tick(&mut self) -> Instant { - poll_fn(|cx| self.poll_tick(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let instant = trace::async_op( + || poll_fn(|cx| self.poll_tick(cx)), + resource_span, + "Interval::tick", + "poll_tick", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let instant = poll_fn(|cx| self.poll_tick(cx)); + + instant.await } /// Polls for the next instant in the interval to be reached. diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 74ae739354b..39fe8579070 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,5 +1,11 @@ cfg_trace! { cfg_rt! { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use pin_project_lite::pin_project; + use std::future::Future; pub(crate) use tracing::instrument::Instrumented; #[inline] @@ -18,6 +24,57 @@ cfg_trace! { ); task.instrument(span) } + + pub(crate) fn async_op(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp + where P: FnOnce() -> F { + resource_span.in_scope(|| { + let async_op_span = tracing::trace_span!("runtime.resource.async_op", source = source, inherits_child_attrs = inherits_child_attrs); + let enter = async_op_span.enter(); + let async_op_poll_span = tracing::trace_span!("runtime.resource.async_op.poll"); + let inner = inner(); + drop(enter); + let tracing_ctx = AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: resource_span.clone(), + }; + InstrumentedAsyncOp { + inner, + tracing_ctx, + poll_op_name, + } + }) + } + + #[derive(Debug, Clone)] + pub(crate) struct AsyncOpTracingCtx { + pub(crate) async_op_span: tracing::Span, + pub(crate) async_op_poll_span: tracing::Span, + pub(crate) resource_span: tracing::Span, + } + + + pin_project! { + #[derive(Debug, Clone)] + pub(crate) struct InstrumentedAsyncOp { + #[pin] + pub(crate) inner: F, + pub(crate) tracing_ctx: AsyncOpTracingCtx, + pub(crate) poll_op_name: &'static str + } + } + + impl Future for InstrumentedAsyncOp { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let _res_enter = this.tracing_ctx.resource_span.enter(); + let _async_op_enter = this.tracing_ctx.async_op_span.enter(); + let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter(); + trace_poll_op!(this.poll_op_name, this.inner.poll(cx)) + } + } } } cfg_time! { From 3b4366e51366a2bc96dd0d81e0134c2648c96230 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 24 Nov 2021 12:23:08 +0000 Subject: [PATCH 2/9] fix location Signed-off-by: Zahari Dichev --- tokio/src/sync/mutex.rs | 8 +------- tokio/src/sync/semaphore.rs | 12 +----------- tokio/src/time/driver/sleep.rs | 14 ++------------ tokio/src/time/interval.rs | 20 ++++---------------- 4 files changed, 8 insertions(+), 46 deletions(-) diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 8c099bd8966..ec25881cf42 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -250,17 +250,15 @@ impl Mutex { /// /// let lock = Mutex::new(5); /// ``` - #[cfg_attr(tokio_track_caller, track_caller)] + #[track_caller] pub fn new(t: T) -> Self where T: Sized, { #[cfg(all(tokio_unstable, feature = "tracing"))] let mutex = { - #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); - #[cfg(tokio_track_caller)] let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Mutex", @@ -270,10 +268,6 @@ impl Mutex { loc.col = location.column(), ); - #[cfg(not(tokio_track_caller))] - let resource_span = - tracing::trace_span!("runtime.resource", concrete_type = "Mutex", kind = "Sync"); - let s = resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index c33dc15d330..fbf557482a4 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -124,14 +124,12 @@ fn bounds() { impl Semaphore { /// Creates a new semaphore with the initial number of permits. - #[cfg_attr(tokio_track_caller, track_caller)] + #[track_caller] pub fn new(permits: usize) -> Self { #[cfg(all(tokio_unstable, feature = "tracing"))] let sem = { - #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); - #[cfg(tokio_track_caller)] let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Semaphore", @@ -142,14 +140,6 @@ impl Semaphore { inherits_child_attrs = true, ); - #[cfg(not(tokio_track_caller))] - let resource_span = tracing::trace_span!( - "runtime.resource", - concrete_type = "Semaphore", - kind = "Sync", - inherits_child_attrs = true - ); - let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); Self { ll_sem, diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index 3a78cd63e2c..efe0e90ac2c 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -251,10 +251,7 @@ cfg_not_trace! { } impl Sleep { - #[cfg_attr( - not(all(tokio_unstable, tokio_track_caller, feature = "tracing")), - allow(unused_variables) - )] + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] pub(crate) fn new_timeout( deadline: Instant, location: Option<&'static Location<'static>>, @@ -268,10 +265,7 @@ impl Sleep { let deadline_tick = time_source.deadline_to_tick(deadline); let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); - #[cfg(tokio_track_caller)] - let location = location.expect("should have location if tracking caller"); - - #[cfg(tokio_track_caller)] + let location = location.expect("should have location if tracing"); let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Sleep", @@ -281,10 +275,6 @@ impl Sleep { loc.col = location.column(), ); - #[cfg(not(tokio_track_caller))] - let resource_span = - tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer"); - let async_op_span = resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 59df3ba4668..a3dcf35c3b4 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -70,7 +70,7 @@ use std::{convert::TryInto, future::Future}; /// /// [`sleep`]: crate::time::sleep() /// [`.tick().await`]: Interval::tick -#[cfg_attr(tokio_track_caller, track_caller)] +#[track_caller] pub fn interval(period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); internal_interval_at(Instant::now(), period, trace::caller_location()) @@ -105,16 +105,13 @@ pub fn interval(period: Duration) -> Interval { /// // approximately 70ms have elapsed. /// } /// ``` -#[cfg_attr(tokio_track_caller, track_caller)] +#[track_caller] pub fn interval_at(start: Instant, period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); internal_interval_at(start, period, trace::caller_location()) } -#[cfg_attr( - not(all(tokio_unstable, tokio_track_caller, feature = "tracing")), - allow(unused_variables) -)] +#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] fn internal_interval_at( start: Instant, period: Duration, @@ -122,10 +119,8 @@ fn internal_interval_at( ) -> Interval { #[cfg(all(tokio_unstable, feature = "tracing"))] let interval = { - #[cfg(tokio_track_caller)] - let location = location.expect("should have location if tracking caller"); + let location = location.expect("should have location if tracing"); - #[cfg(tokio_track_caller)] let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Interval", @@ -135,13 +130,6 @@ fn internal_interval_at( loc.col = location.column(), ); - #[cfg(not(tokio_track_caller))] - let resource_span = tracing::trace_span!( - "runtime.resource", - concrete_type = "Interval", - kind = "timer" - ); - let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); Interval { delay, From 4039d0e0e32ef2e57f6cfe22eb81f07a8d28e999 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 24 Nov 2021 12:23:21 +0000 Subject: [PATCH 3/9] instrument barrier Signed-off-by: Zahari Dichev --- tokio/src/sync/barrier.rs | 78 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 0e39dac8bb5..aef89f56563 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,5 +1,7 @@ use crate::loom::sync::Mutex; use crate::sync::watch; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; /// A barrier enables multiple tasks to synchronize the beginning of some computation. /// @@ -41,6 +43,8 @@ pub struct Barrier { state: Mutex, wait: watch::Receiver, n: usize, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } #[derive(Debug)] @@ -55,6 +59,7 @@ impl Barrier { /// /// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all /// tasks at once when the `n`th task calls `wait`. + #[track_caller] pub fn new(mut n: usize) -> Barrier { let (waker, wait) = crate::sync::watch::channel(0); @@ -65,7 +70,44 @@ impl Barrier { n = 1; } - Barrier { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let barrier = { + let location = std::panic::Location::caller(); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Barrier", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + size = n, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + arrived = 0, + ) + }); + + Barrier { + state: Mutex::new(BarrierState { + waker, + arrived: 0, + generation: 1, + }), + n, + wait, + resource_span: resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let barrier = Barrier { state: Mutex::new(BarrierState { waker, arrived: 0, @@ -73,7 +115,9 @@ impl Barrier { }), n, wait, - } + }; + + barrier } /// Does not resolve until all tasks have rendezvoused here. @@ -85,6 +129,20 @@ impl Barrier { /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks /// will receive a result that will return `false` from `is_leader`. pub async fn wait(&self) -> BarrierWaitResult { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace::async_op( + || self.wait_internal(), + self.resource_span.clone(), + "Barrier::wait", + "poll", + false, + ) + .await; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return self.wait_internal().await; + } + async fn wait_internal(&self) -> BarrierWaitResult { // NOTE: we are taking a _synchronous_ lock here. // It is okay to do so because the critical section is fast and never yields, so it cannot // deadlock even if another future is concurrently holding the lock. @@ -96,7 +154,23 @@ impl Barrier { let mut state = self.state.lock(); let generation = state.generation; state.arrived += 1; + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::state_update", + arrived = 1, + arrived.op = "add", + ); + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::async_op::state_update", + arrived = true, + ); if state.arrived == self.n { + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::async_op::state_update", + is_leader = true, + ); // we are the leader for this generation // wake everyone, increment the generation, and return state From 88ebd0361e760e68aca705e40ee987c77b9f7679 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 29 Nov 2021 19:24:59 +0000 Subject: [PATCH 4/9] instrument RwLock Signed-off-by: Zahari Dichev --- tokio/src/sync/rwlock.rs | 378 ++++++++++++++++-- tokio/src/sync/rwlock/owned_read_guard.rs | 43 +- tokio/src/sync/rwlock/owned_write_guard.rs | 82 +++- .../sync/rwlock/owned_write_guard_mapped.rs | 44 +- tokio/src/sync/rwlock/read_guard.rs | 43 +- tokio/src/sync/rwlock/write_guard.rs | 81 +++- tokio/src/sync/rwlock/write_guard_mapped.rs | 45 ++- 7 files changed, 659 insertions(+), 57 deletions(-) diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 120bc72b848..f75e113ff89 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,5 +1,7 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError}; use crate::sync::mutex::TryLockError; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::cell::UnsafeCell; use std::marker; use std::marker::PhantomData; @@ -86,6 +88,9 @@ const MAX_READS: u32 = 10; /// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies #[derive(Debug)] pub struct RwLock { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, + // maximum number of concurrent readers mr: u32, @@ -197,15 +202,58 @@ impl RwLock { /// /// let lock = RwLock::new(5); /// ``` + #[track_caller] pub fn new(value: T) -> RwLock where T: Sized, { - RwLock { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let rwlock = { + let location = std::panic::Location::caller(); + + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "RwLock", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + max_readers = MAX_READS, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 0, + ); + Semaphore::new(MAX_READS as usize) + }); + + RwLock { + mr: MAX_READS, + c: UnsafeCell::new(value), + s, + resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let rwlock = RwLock { mr: MAX_READS, c: UnsafeCell::new(value), s: Semaphore::new(MAX_READS as usize), - } + }; + + rwlock } /// Creates a new instance of an `RwLock` which is unlocked @@ -222,6 +270,7 @@ impl RwLock { /// # Panics /// /// Panics if `max_reads` is more than `u32::MAX >> 3`. + #[track_caller] pub fn with_max_readers(value: T, max_reads: u32) -> RwLock where T: Sized, @@ -231,11 +280,54 @@ impl RwLock { "a RwLock may not be created with more than {} readers", MAX_READS ); - RwLock { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let rwlock = { + let location = std::panic::Location::caller(); + + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "RwLock", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + max_readers = max_reads, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 0, + ); + Semaphore::new(max_reads as usize) + }); + + RwLock { + mr: max_reads, + c: UnsafeCell::new(value), + s, + resource_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let rwlock = RwLock { mr: max_reads, c: UnsafeCell::new(value), s: Semaphore::new(max_reads as usize), - } + }; + + rwlock } /// Creates a new instance of an `RwLock` which is unlocked. @@ -253,11 +345,20 @@ impl RwLock { where T: Sized, { - RwLock { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLock { mr: MAX_READS, c: UnsafeCell::new(value), s: Semaphore::const_new(MAX_READS as usize), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return RwLock { + mr: MAX_READS, + c: UnsafeCell::new(value), + s: Semaphore::const_new(MAX_READS as usize), + }; } /// Creates a new instance of an `RwLock` which is unlocked @@ -277,11 +378,20 @@ impl RwLock { T: Sized, { max_reads &= MAX_READS; - RwLock { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLock { mr: max_reads, c: UnsafeCell::new(value), s: Semaphore::const_new(max_reads as usize), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return RwLock { + mr: max_reads, + c: UnsafeCell::new(value), + s: Semaphore::const_new(max_reads as usize), + }; } /// Locks this `RwLock` with shared read access, causing the current task @@ -330,16 +440,49 @@ impl RwLock { ///} /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - self.s.acquire(1).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(1), + self.resource_span.clone(), + "RwLock::read", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(1); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - RwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = RwLockReadGuard { s: &self.s, data: self.c.get(), marker: marker::PhantomData, - } + resource_span: self.resource_span.clone(), + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let guard = RwLockReadGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + }; + + guard } /// Locks this `RwLock` with shared read access, causing the current task @@ -394,16 +537,52 @@ impl RwLock { ///} /// ``` pub async fn read_owned(self: Arc) -> OwnedRwLockReadGuard { - self.s.acquire(1).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(1), + self.resource_span.clone(), + "RwLock::read_owned", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(1); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - OwnedRwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let guard = OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let guard = OwnedRwLockReadGuard { + data: self.c.get(), + lock: ManuallyDrop::new(self), + _p: PhantomData, + }; + + guard } /// Attempts to acquire this `RwLock` with shared read access. @@ -445,6 +624,24 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(RwLockReadGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + resource_span: self.resource_span.clone(), + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] Ok(RwLockReadGuard { s: &self.s, data: self.c.get(), @@ -497,11 +694,32 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } - Ok(OwnedRwLockReadGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, - }) + resource_span, + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Ok(OwnedRwLockReadGuard { + data: self.c.get(), + lock: ManuallyDrop::new(self), + _p: PhantomData, + }); } /// Locks this `RwLock` with exclusive write access, causing the current @@ -533,17 +751,49 @@ impl RwLock { ///} /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(self.mr), + self.resource_span.clone(), + "RwLock::write", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(self.mr); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - RwLockWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, - } + resource_span: self.resource_span.clone(), + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return RwLockWriteGuard { + permits_acquired: self.mr, + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + }; } /// Locks this `RwLock` with exclusive write access, causing the current @@ -582,17 +832,52 @@ impl RwLock { ///} /// ``` pub async fn write_owned(self: Arc) -> OwnedRwLockWriteGuard { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(self.mr), + self.resource_span.clone(), + "RwLock::write_owned", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(self.mr); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - OwnedRwLockWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return OwnedRwLockWriteGuard { + permits_acquired: self.mr, + data: self.c.get(), + lock: ManuallyDrop::new(self), + _p: PhantomData, + }; } /// Attempts to acquire this `RwLock` with exclusive write access. @@ -625,12 +910,31 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } - Ok(RwLockWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, - }) + resource_span: self.resource_span.clone(), + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(RwLockWriteGuard { + permits_acquired: self.mr, + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + }); } /// Attempts to acquire this `RwLock` with exclusive write access. @@ -670,12 +974,34 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } - Ok(OwnedRwLockWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, - }) + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(OwnedRwLockWriteGuard { + permits_acquired: self.mr, + data: self.c.get(), + lock: ManuallyDrop::new(self), + _p: PhantomData, + }); } /// Returns a mutable reference to the underlying data. diff --git a/tokio/src/sync/rwlock/owned_read_guard.rs b/tokio/src/sync/rwlock/owned_read_guard.rs index 1881295846e..aebb0b62dbe 100644 --- a/tokio/src/sync/rwlock/owned_read_guard.rs +++ b/tokio/src/sync/rwlock/owned_read_guard.rs @@ -15,6 +15,8 @@ use std::sync::Arc; /// [`read_owned`]: method@crate::sync::RwLock::read_owned /// [`RwLock`]: struct@crate::sync::RwLock pub struct OwnedRwLockReadGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, pub(super) data: *const U, @@ -56,13 +58,25 @@ impl OwnedRwLockReadGuard { { let data = f(&*this) as *const V; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - OwnedRwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return OwnedRwLockReadGuard { + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + }; } /// Attempts to make a new [`OwnedRwLockReadGuard`] for a component of the @@ -105,13 +119,25 @@ impl OwnedRwLockReadGuard { None => return Err(this), }; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(OwnedRwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(OwnedRwLockReadGuard { + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, - }) + }); } } @@ -145,5 +171,14 @@ impl Drop for OwnedRwLockReadGuard { fn drop(&mut self) { self.lock.s.release(1); unsafe { ManuallyDrop::drop(&mut self.lock) }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "sub", + ) + }); } } diff --git a/tokio/src/sync/rwlock/owned_write_guard.rs b/tokio/src/sync/rwlock/owned_write_guard.rs index 0a78d28e903..8e8c7a32c10 100644 --- a/tokio/src/sync/rwlock/owned_write_guard.rs +++ b/tokio/src/sync/rwlock/owned_write_guard.rs @@ -16,6 +16,8 @@ use std::sync::Arc; /// [`write_owned`]: method@crate::sync::RwLock::write_owned /// [`RwLock`]: struct@crate::sync::RwLock pub struct OwnedRwLockWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, @@ -64,14 +66,27 @@ impl OwnedRwLockWriteGuard { let data = f(&mut *this) as *mut U; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - OwnedRwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return OwnedRwLockMappedWriteGuard { + permits_acquired, + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, - } + }; } /// Attempts to make a new [`OwnedRwLockMappedWriteGuard`] for a component @@ -123,14 +138,28 @@ impl OwnedRwLockWriteGuard { }; let permits_acquired = this.permits_acquired; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); + // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(OwnedRwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(OwnedRwLockMappedWriteGuard { + permits_acquired, + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, - }) + }); } /// Converts this `OwnedRwLockWriteGuard` into an @@ -181,16 +210,47 @@ impl OwnedRwLockWriteGuard { pub fn downgrade(mut self) -> OwnedRwLockReadGuard { let lock = unsafe { ManuallyDrop::take(&mut self.lock) }; let data = self.data; + let to_release = (self.permits_acquired - 1) as usize; // Release all but one of the permits held by the write guard - lock.s.release((self.permits_acquired - 1) as usize); + lock.s.release(to_release); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(self); - OwnedRwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return OwnedRwLockReadGuard { + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + }; } } @@ -229,6 +289,14 @@ where impl Drop for OwnedRwLockWriteGuard { fn drop(&mut self) { self.lock.s.release(self.permits_acquired as usize); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); unsafe { ManuallyDrop::drop(&mut self.lock) }; } } diff --git a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs index d88ee01e1fd..8a9c04d2ec0 100644 --- a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs @@ -15,6 +15,8 @@ use std::sync::Arc; /// [mapping]: method@crate::sync::OwnedRwLockWriteGuard::map /// [`OwnedRwLockWriteGuard`]: struct@crate::sync::OwnedRwLockWriteGuard pub struct OwnedRwLockMappedWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, @@ -63,14 +65,27 @@ impl OwnedRwLockMappedWriteGuard { let data = f(&mut *this) as *mut V; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - OwnedRwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return OwnedRwLockMappedWriteGuard { + permits_acquired, + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, - } + }; } /// Attempts to make a new `OwnedRwLockMappedWriteGuard` for a component @@ -120,14 +135,27 @@ impl OwnedRwLockMappedWriteGuard { }; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(OwnedRwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(OwnedRwLockMappedWriteGuard { + permits_acquired, + lock: ManuallyDrop::new(lock), + data, + _p: PhantomData, + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, - }) + }); } } @@ -166,6 +194,14 @@ where impl Drop for OwnedRwLockMappedWriteGuard { fn drop(&mut self) { self.lock.s.release(self.permits_acquired as usize); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); unsafe { ManuallyDrop::drop(&mut self.lock) }; } } diff --git a/tokio/src/sync/rwlock/read_guard.rs b/tokio/src/sync/rwlock/read_guard.rs index 090b297e4af..3f11fb285c5 100644 --- a/tokio/src/sync/rwlock/read_guard.rs +++ b/tokio/src/sync/rwlock/read_guard.rs @@ -13,6 +13,8 @@ use std::ops; /// [`read`]: method@crate::sync::RwLock::read /// [`RwLock`]: struct@crate::sync::RwLock pub struct RwLockReadGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) s: &'a Semaphore, pub(super) data: *const T, pub(super) marker: marker::PhantomData<&'a T>, @@ -59,13 +61,25 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { { let data = f(&*this) as *const U; let s = this.s; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - RwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLockReadGuard { s, data, marker: marker::PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + }; } /// Attempts to make a new [`RwLockReadGuard`] for a component of the @@ -113,13 +127,25 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { None => return Err(this), }; let s = this.s; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(RwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(RwLockReadGuard { s, data, marker: marker::PhantomData, - }) + }); } } @@ -152,5 +178,14 @@ where impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> { fn drop(&mut self) { self.s.release(1); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "sub", + ) + }); } } diff --git a/tokio/src/sync/rwlock/write_guard.rs b/tokio/src/sync/rwlock/write_guard.rs index 8c80ee70db4..cb6dc5786ea 100644 --- a/tokio/src/sync/rwlock/write_guard.rs +++ b/tokio/src/sync/rwlock/write_guard.rs @@ -15,6 +15,8 @@ use std::ops; /// [`write`]: method@crate::sync::RwLock::write /// [`RwLock`]: struct@crate::sync::RwLock pub struct RwLockWriteGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, pub(super) s: &'a Semaphore, pub(super) data: *mut T, @@ -66,14 +68,26 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { let data = f(&mut *this) as *mut U; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - RwLockMappedWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return RwLockMappedWriteGuard { + permits_acquired, + s, + data, + marker: marker::PhantomData, + }; } /// Attempts to make a new [`RwLockMappedWriteGuard`] for a component of @@ -129,14 +143,26 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { }; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(RwLockMappedWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(RwLockMappedWriteGuard { + permits_acquired, + s, + data, + marker: marker::PhantomData, + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, - }) + }); } /// Converts this `RwLockWriteGuard` into an `RwLockMappedWriteGuard`. This @@ -188,16 +214,46 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { /// [`RwLock`]: struct@crate::sync::RwLock pub fn downgrade(self) -> RwLockReadGuard<'a, T> { let RwLockWriteGuard { s, data, .. } = self; - + let to_release = (self.permits_acquired - 1) as usize; // Release all but one of the permits held by the write guard - s.release((self.permits_acquired - 1) as usize); + s.release(to_release); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(self); - RwLockReadGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLockReadGuard { s, data, marker: marker::PhantomData, - } + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + }; } } @@ -236,5 +292,14 @@ where impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> { fn drop(&mut self) { self.s.release(self.permits_acquired as usize); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); } } diff --git a/tokio/src/sync/rwlock/write_guard_mapped.rs b/tokio/src/sync/rwlock/write_guard_mapped.rs index 3cf69de4bdd..5b056ff09da 100644 --- a/tokio/src/sync/rwlock/write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/write_guard_mapped.rs @@ -14,6 +14,8 @@ use std::ops; /// [mapping]: method@crate::sync::RwLockWriteGuard::map /// [`RwLockWriteGuard`]: struct@crate::sync::RwLockWriteGuard pub struct RwLockMappedWriteGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, pub(super) s: &'a Semaphore, pub(super) data: *mut T, @@ -64,14 +66,27 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { let data = f(&mut *this) as *mut U; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - RwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return RwLockMappedWriteGuard { + permits_acquired, + s, + data, + marker: marker::PhantomData, + resource_span, + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, - } + }; } /// Attempts to make a new [`RwLockMappedWriteGuard`] for a component of @@ -126,14 +141,27 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { }; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - Ok(RwLockMappedWriteGuard { + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Ok(RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, - }) + resource_span, + }); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return Ok(RwLockMappedWriteGuard { + permits_acquired, + s, + data, + marker: marker::PhantomData, + }); } } @@ -172,5 +200,14 @@ where impl<'a, T: ?Sized> Drop for RwLockMappedWriteGuard<'a, T> { fn drop(&mut self) { self.s.release(self.permits_acquired as usize); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); } } From e511340b027091df37dd2783b8ca1984123ac958 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 6 Dec 2021 21:30:46 +0000 Subject: [PATCH 5/9] instrument oneshot Signed-off-by: Zahari Dichev --- tokio/src/sync/oneshot.rs | 164 +++++++++++++++++++++++++++++++++++++- 1 file changed, 161 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 08a2d9e49d7..405580b04b4 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -122,6 +122,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::fmt; use std::future::Future; @@ -305,6 +307,10 @@ pub struct Sender { #[derive(Debug)] pub struct Receiver { inner: Option>>, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_span: tracing::Span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_poll_span: tracing::Span, } pub mod error { @@ -375,6 +381,9 @@ struct Inner { /// The `RX_TASK_SET` bit in the `state` field is set if this field is /// initialized. If that bit is unset, this field may be uninitialized. rx_task: Task, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } struct Task(UnsafeCell>); @@ -442,7 +451,63 @@ struct State(usize); /// } /// } /// ``` +#[track_caller] pub fn channel() -> (Sender, Receiver) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let location = std::panic::Location::caller(); + + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sender|Receiver", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_dropped = false, + tx_dropped.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = false, + rx_dropped.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_sent = false, + value_sent.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_received = false, + value_received.op = "override", + ) + }); + + Arc::new(Inner { + state: AtomicUsize::new(State::new().as_usize()), + value: UnsafeCell::new(None), + tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), + rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), + resource_span, + }) + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] let inner = Arc::new(Inner { state: AtomicUsize::new(State::new().as_usize()), value: UnsafeCell::new(None), @@ -453,6 +518,24 @@ pub fn channel() -> (Sender, Receiver) { let tx = Sender { inner: Some(inner.clone()), }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let rx = { + let async_op_span = inner.resource_span.in_scope(|| { + tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + Receiver { + inner: Some(inner), + async_op_span, + async_op_poll_span, + } + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] let rx = Receiver { inner: Some(inner) }; (tx, rx) @@ -525,6 +608,15 @@ impl Sender { } } + #[cfg(all(tokio_unstable, feature = "tracing"))] + inner.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_sent = true, + value_sent.op = "override", + ) + }); + Ok(()) } @@ -598,7 +690,20 @@ impl Sender { pub async fn closed(&mut self) { use crate::future::poll_fn; - poll_fn(|cx| self.poll_closed(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.inner.as_ref().unwrap().resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let closed = trace::async_op( + || poll_fn(|cx| self.poll_closed(cx)), + resource_span, + "Sender::closed", + "poll_closed", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let closed = poll_fn(|cx| self.poll_closed(cx)); + + closed.await } /// Returns `true` if the associated [`Receiver`] handle has been dropped. @@ -728,6 +833,14 @@ impl Drop for Sender { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.complete(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + inner.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_dropped = true, + tx_dropped.op = "override", + ) + }); } } } @@ -795,6 +908,14 @@ impl Receiver { pub fn close(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.close(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + inner.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = true, + rx_dropped.op = "override", + ) + }); } } @@ -872,7 +993,17 @@ impl Receiver { // `UnsafeCell`. Therefore, it is now safe for us to access the // cell. match unsafe { inner.consume_value() } { - Some(value) => Ok(value), + Some(value) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + inner.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_received = true, + value_received.op = "override", + ) + }); + Ok(value) + } None => Err(TryRecvError::Closed), } } else if state.is_closed() { @@ -894,6 +1025,14 @@ impl Drop for Receiver { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.close(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + inner.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = true, + rx_dropped.op = "override", + ) + }); } } } @@ -903,8 +1042,27 @@ impl Future for Receiver { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If `inner` is `None`, then `poll()` has already completed. + + cfg_trace! { + let mut _res_span; + let mut _ao_span; + let mut _ao_poll_span; + } + let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { - ready!(inner.poll_recv(cx))? + cfg_trace! { + _res_span = inner.resource_span.clone().entered(); + _ao_span = self.async_op_span.clone().entered(); + _ao_poll_span = self.async_op_poll_span.clone().entered(); + } + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let res = ready!(inner.poll_recv(cx))?; + + res } else { panic!("called after complete"); }; From f928fbbc1ea512038650bd35a2cae4a1c56435e5 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 10 Dec 2021 15:16:35 +0000 Subject: [PATCH 6/9] clean up cfg statements Signed-off-by: Zahari Dichev --- tokio/src/macros/cfg.rs | 12 - tokio/src/sync/barrier.rs | 23 +- tokio/src/sync/batch_semaphore.rs | 151 +++++------- tokio/src/sync/mutex.rs | 216 ++++++++---------- tokio/src/sync/oneshot.rs | 77 +++---- tokio/src/sync/rwlock.rs | 187 +++++---------- tokio/src/sync/rwlock/owned_read_guard.rs | 26 +-- tokio/src/sync/rwlock/owned_write_guard.rs | 41 +--- .../sync/rwlock/owned_write_guard_mapped.rs | 28 +-- tokio/src/sync/rwlock/read_guard.rs | 26 +-- tokio/src/sync/rwlock/write_guard.rs | 41 +--- tokio/src/sync/rwlock/write_guard_mapped.rs | 28 +-- tokio/src/sync/semaphore.rs | 34 +-- tokio/src/time/driver/sleep.rs | 51 ++--- tokio/src/time/interval.rs | 29 ++- 15 files changed, 343 insertions(+), 627 deletions(-) diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9ebe0077846..4ab13c2c11c 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -362,12 +362,6 @@ macro_rules! cfg_not_time { } macro_rules! cfg_trace { - ($($stmt:stmt;)*) => { - $( - #[cfg(all(tokio_unstable, feature = "tracing"))] - $stmt; - )* - }; ($($item:item)*) => { $( #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -378,12 +372,6 @@ macro_rules! cfg_trace { } macro_rules! cfg_not_trace { - ($($stmt:stmt;)*) => { - $( - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - $stmt; - )* - }; ($($item:item)*) => { $( #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index aef89f56563..dfc76a40ebf 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -71,7 +71,7 @@ impl Barrier { } #[cfg(all(tokio_unstable, feature = "tracing"))] - let barrier = { + let resource_span = { let location = std::panic::Location::caller(); let resource_span = tracing::trace_span!( "runtime.resource", @@ -93,21 +93,10 @@ impl Barrier { arrived = 0, ) }); - - Barrier { - state: Mutex::new(BarrierState { - waker, - arrived: 0, - generation: 1, - }), - n, - wait, - resource_span: resource_span, - } + resource_span }; - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let barrier = Barrier { + Barrier { state: Mutex::new(BarrierState { waker, arrived: 0, @@ -115,9 +104,9 @@ impl Barrier { }), n, wait, - }; - - barrier + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span, + } } /// Does not resolve until all tasks have rendezvoused here. diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 792d952066c..4f5effff319 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -137,7 +137,8 @@ impl Semaphore { Self::MAX_PERMITS ); - cfg_trace! { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Semaphore", @@ -152,24 +153,17 @@ impl Semaphore { permits.op = "override", ) }); + resource_span + }; - return Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), - resource_span, - }; - } - cfg_not_trace! { - return Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), - }; + Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -185,25 +179,14 @@ impl Semaphore { // currently we just clamp the permit count when it exceeds the max permits &= Self::MAX_PERMITS; - cfg_trace! { - return Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::const_new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), - resource_span: tracing::Span::none(), - }; - } - - cfg_not_trace! { - return Self { - permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), - waiters: Mutex::const_new(Waitlist { - queue: LinkedList::new(), - closed: false, - }), - }; + Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::const_new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -336,15 +319,15 @@ impl Semaphore { ); // add remaining permits back - cfg_trace! { - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - permits = rem, - permits.op = "add", - ) - }); - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = rem, + permits.op = "add", + ) + }); + rem = 0; } @@ -409,20 +392,20 @@ impl Semaphore { acquired += acq; if remaining == 0 { if !queued { - cfg_trace! { - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - permits = acquired, - permits.op = "sub", - ); - tracing::trace!( - target: "runtime::resource::async_op::state_update", - permits_obtained = acquired, - permits.op = "add", - ) - }); - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ); + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = acquired, + permits.op = "add", + ) + }); + return Ready(Ok(())); } else if lock.is_none() { break self.waiters.lock(); @@ -438,15 +421,14 @@ impl Semaphore { return Ready(Err(AcquireError::closed())); } - cfg_trace! { - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - permits = acquired, - permits.op = "sub", - ) - }); - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ) + }); if node.assign_permits(&mut acquired) { self.add_permits_locked(acquired, waiters); @@ -492,26 +474,17 @@ impl fmt::Debug for Semaphore { } impl Waiter { - cfg_not_trace! { - fn new(num_permits: u32) -> Self { - Waiter { - waker: UnsafeCell::new(None), - state: AtomicUsize::new(num_permits as usize), - pointers: linked_list::Pointers::new(), - _p: PhantomPinned, - } - } - } - - cfg_trace! { - fn new(num_permits: u32, ctx: trace::AsyncOpTracingCtx) -> Self { - Waiter { - waker: UnsafeCell::new(None), - state: AtomicUsize::new(num_permits as usize), - pointers: linked_list::Pointers::new(), - ctx, - _p: PhantomPinned, - } + fn new( + num_permits: u32, + #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, + ) -> Self { + Waiter { + waker: UnsafeCell::new(None), + state: AtomicUsize::new(num_permits as usize), + pointers: linked_list::Pointers::new(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + ctx, + _p: PhantomPinned, } } diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index ec25881cf42..2476726bdea 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -256,40 +256,37 @@ impl Mutex { T: Sized, { #[cfg(all(tokio_unstable, feature = "tracing"))] - let mutex = { + let resource_span = { let location = std::panic::Location::caller(); - let resource_span = tracing::trace_span!( + tracing::trace_span!( "runtime.resource", concrete_type = "Mutex", kind = "Sync", loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), - ); - - let s = resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - locked = false, - ); - semaphore::Semaphore::new(1) - }); - - Self { - c: UnsafeCell::new(t), - s, - resource_span, - } + ) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + semaphore::Semaphore::new(1) + }); + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let mutex = Self { - c: UnsafeCell::new(t), - s: semaphore::Semaphore::new(1), - }; + let s = semaphore::Semaphore::new(1); - mutex + Self { + c: UnsafeCell::new(t), + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Creates a new lock in an unlocked state ready for use. @@ -307,18 +304,12 @@ impl Mutex { where T: Sized, { - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Self { + Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::const_new(1), + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span::none(), - }; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - return Self { - c: UnsafeCell::new(t), - s: semaphore::Semaphore::const_new(1), - }; + } } /// Locks this mutex, causing the current task to yield until the lock has @@ -346,36 +337,31 @@ impl Mutex { /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = { - trace::async_op( - || self.acquire(), - self.resource_span.clone(), - "Mutex::lock", - "poll", - false, - ) - .await; - - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - locked = true, - ); - }); - - MutexGuard { - lock: self, - resource_span: self.resource_span.clone(), - } - }; + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let guard = { - self.acquire().await; - MutexGuard { lock: self } - }; + self.acquire().await; - guard + MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + } } /// Blocking lock this mutex. When the lock has been acquired, function returns a @@ -446,37 +432,34 @@ impl Mutex { /// [`Arc`]: std::sync::Arc pub async fn lock_owned(self: Arc) -> OwnedMutexGuard { #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = { - trace::async_op( - || self.acquire(), - self.resource_span.clone(), - "Mutex::lock_owned", - "poll", - false, - ) - .await; - - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - locked = true, - ); - }); - - let resource_span = self.resource_span.clone(); - OwnedMutexGuard { - lock: self, - resource_span, - } - }; + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock_owned", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let guard = { - self.acquire().await; - OwnedMutexGuard { lock: self } - }; + self.acquire().await; - guard + OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } async fn acquire(&self) { @@ -508,23 +491,18 @@ impl Mutex { match self.s.try_acquire(1) { Ok(_) => { #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = { - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - locked = true, - ); - }); - Ok(MutexGuard { - lock: self, - resource_span: self.resource_span.clone(), - }) - }; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let guard = Ok(MutexGuard { lock: self }); - - guard + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + Ok(MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + }) } Err(_) => Err(TryLockError(())), } @@ -582,23 +560,21 @@ impl Mutex { match self.s.try_acquire(1) { Ok(_) => { #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = { - self.resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - locked = true, - ); - }); - let resource_span = self.resource_span.clone(); - Ok(OwnedMutexGuard { - lock: self, - resource_span, - }) - }; - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let guard = Ok(OwnedMutexGuard { lock: self }); - - guard + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + Ok(OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + }) } Err(_) => Err(TryLockError(())), } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 405580b04b4..cfc92259d39 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -217,6 +217,8 @@ use std::task::{Context, Poll, Waker}; #[derive(Debug)] pub struct Sender { inner: Option>>, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } /// Receives a value from the associated [`Sender`]. @@ -308,6 +310,8 @@ pub struct Sender { pub struct Receiver { inner: Option>>, #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, + #[cfg(all(tokio_unstable, feature = "tracing"))] async_op_span: tracing::Span, #[cfg(all(tokio_unstable, feature = "tracing"))] async_op_poll_span: tracing::Span, @@ -381,9 +385,6 @@ struct Inner { /// The `RX_TASK_SET` bit in the `state` field is set if this field is /// initialized. If that bit is unset, this field may be uninitialized. rx_task: Task, - - #[cfg(all(tokio_unstable, feature = "tracing"))] - resource_span: tracing::Span, } struct Task(UnsafeCell>); @@ -454,7 +455,7 @@ struct State(usize); #[track_caller] pub fn channel() -> (Sender, Receiver) { #[cfg(all(tokio_unstable, feature = "tracing"))] - let inner = { + let resource_span = { let location = std::panic::Location::caller(); let resource_span = tracing::trace_span!( @@ -498,16 +499,9 @@ pub fn channel() -> (Sender, Receiver) { ) }); - Arc::new(Inner { - state: AtomicUsize::new(State::new().as_usize()), - value: UnsafeCell::new(None), - tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), - rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())), - resource_span, - }) + resource_span }; - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] let inner = Arc::new(Inner { state: AtomicUsize::new(State::new().as_usize()), value: UnsafeCell::new(None), @@ -517,27 +511,28 @@ pub fn channel() -> (Sender, Receiver) { let tx = Sender { inner: Some(inner.clone()), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span.clone(), }; #[cfg(all(tokio_unstable, feature = "tracing"))] - let rx = { - let async_op_span = inner.resource_span.in_scope(|| { - tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await") - }); + let async_op_span = resource_span + .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await")); - let async_op_poll_span = - async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); - Receiver { - inner: Some(inner), - async_op_span, - async_op_poll_span, - } + let rx = Receiver { + inner: Some(inner), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_poll_span, }; - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let rx = Receiver { inner: Some(inner) }; - (tx, rx) } @@ -609,7 +604,7 @@ impl Sender { } #[cfg(all(tokio_unstable, feature = "tracing"))] - inner.resource_span.in_scope(|| { + self.resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", value_sent = true, @@ -691,7 +686,7 @@ impl Sender { use crate::future::poll_fn; #[cfg(all(tokio_unstable, feature = "tracing"))] - let resource_span = self.inner.as_ref().unwrap().resource_span.clone(); + let resource_span = self.resource_span.clone(); #[cfg(all(tokio_unstable, feature = "tracing"))] let closed = trace::async_op( || poll_fn(|cx| self.poll_closed(cx)), @@ -834,7 +829,7 @@ impl Drop for Sender { if let Some(inner) = self.inner.as_ref() { inner.complete(); #[cfg(all(tokio_unstable, feature = "tracing"))] - inner.resource_span.in_scope(|| { + self.resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", tx_dropped = true, @@ -909,7 +904,7 @@ impl Receiver { if let Some(inner) = self.inner.as_ref() { inner.close(); #[cfg(all(tokio_unstable, feature = "tracing"))] - inner.resource_span.in_scope(|| { + self.resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", rx_dropped = true, @@ -995,7 +990,7 @@ impl Receiver { match unsafe { inner.consume_value() } { Some(value) => { #[cfg(all(tokio_unstable, feature = "tracing"))] - inner.resource_span.in_scope(|| { + self.resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", value_received = true, @@ -1026,7 +1021,7 @@ impl Drop for Receiver { if let Some(inner) = self.inner.as_ref() { inner.close(); #[cfg(all(tokio_unstable, feature = "tracing"))] - inner.resource_span.in_scope(|| { + self.resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", rx_dropped = true, @@ -1042,20 +1037,14 @@ impl Future for Receiver { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If `inner` is `None`, then `poll()` has already completed. - - cfg_trace! { - let mut _res_span; - let mut _ao_span; - let mut _ao_poll_span; - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.async_op_poll_span.clone().entered(); let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { - cfg_trace! { - _res_span = inner.resource_span.clone().entered(); - _ao_span = self.async_op_span.clone().entered(); - _ao_poll_span = self.async_op_poll_span.clone().entered(); - } - #[cfg(all(tokio_unstable, feature = "tracing"))] let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index f75e113ff89..341730214cb 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -208,9 +208,8 @@ impl RwLock { T: Sized, { #[cfg(all(tokio_unstable, feature = "tracing"))] - let rwlock = { + let resource_span = { let location = std::panic::Location::caller(); - let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "RwLock", @@ -220,7 +219,7 @@ impl RwLock { loc.col = location.column(), ); - let s = resource_span.in_scope(|| { + resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", max_readers = MAX_READS, @@ -235,25 +234,24 @@ impl RwLock { target: "runtime::resource::state_update", current_readers = 0, ); - Semaphore::new(MAX_READS as usize) }); - RwLock { - mr: MAX_READS, - c: UnsafeCell::new(value), - s, - resource_span, - } + resource_span }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize)); + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let rwlock = RwLock { + let s = Semaphore::new(MAX_READS as usize); + + RwLock { mr: MAX_READS, c: UnsafeCell::new(value), - s: Semaphore::new(MAX_READS as usize), - }; - - rwlock + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Creates a new instance of an `RwLock` which is unlocked @@ -282,7 +280,7 @@ impl RwLock { ); #[cfg(all(tokio_unstable, feature = "tracing"))] - let rwlock = { + let resource_span = { let location = std::panic::Location::caller(); let resource_span = tracing::trace_span!( @@ -294,7 +292,7 @@ impl RwLock { loc.col = location.column(), ); - let s = resource_span.in_scope(|| { + resource_span.in_scope(|| { tracing::trace!( target: "runtime::resource::state_update", max_readers = max_reads, @@ -309,25 +307,24 @@ impl RwLock { target: "runtime::resource::state_update", current_readers = 0, ); - Semaphore::new(max_reads as usize) }); - RwLock { - mr: max_reads, - c: UnsafeCell::new(value), - s, - resource_span, - } + resource_span }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize)); + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let rwlock = RwLock { + let s = Semaphore::new(max_reads as usize); + + RwLock { mr: max_reads, c: UnsafeCell::new(value), - s: Semaphore::new(max_reads as usize), - }; - - rwlock + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Creates a new instance of an `RwLock` which is unlocked. @@ -345,20 +342,13 @@ impl RwLock { where T: Sized, { - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLock { + RwLock { mr: MAX_READS, c: UnsafeCell::new(value), s: Semaphore::const_new(MAX_READS as usize), + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span::none(), - }; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - return RwLock { - mr: MAX_READS, - c: UnsafeCell::new(value), - s: Semaphore::const_new(MAX_READS as usize), - }; + } } /// Creates a new instance of an `RwLock` which is unlocked @@ -378,20 +368,13 @@ impl RwLock { T: Sized, { max_reads &= MAX_READS; - #[cfg(all(tokio_unstable, feature = "tracing"))] return RwLock { mr: max_reads, c: UnsafeCell::new(value), s: Semaphore::const_new(max_reads as usize), + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span::none(), }; - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - return RwLock { - mr: max_reads, - c: UnsafeCell::new(value), - s: Semaphore::const_new(max_reads as usize), - }; } /// Locks this `RwLock` with shared read access, causing the current task @@ -467,22 +450,13 @@ impl RwLock { ) }); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = RwLockReadGuard { + RwLockReadGuard { s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: self.resource_span.clone(), - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let guard = RwLockReadGuard { - s: &self.s, - data: self.c.get(), - marker: marker::PhantomData, - }; - - guard + } } /// Locks this `RwLock` with shared read access, causing the current task @@ -567,22 +541,13 @@ impl RwLock { #[cfg(all(tokio_unstable, feature = "tracing"))] let resource_span = self.resource_span.clone(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let guard = OwnedRwLockReadGuard { + OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let guard = OwnedRwLockReadGuard { - data: self.c.get(), - lock: ManuallyDrop::new(self), - _p: PhantomData, - }; - - guard + } } /// Attempts to acquire this `RwLock` with shared read access. @@ -633,19 +598,12 @@ impl RwLock { ) }); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(RwLockReadGuard { - s: &self.s, - data: self.c.get(), - marker: marker::PhantomData, - resource_span: self.resource_span.clone(), - }); - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] Ok(RwLockReadGuard { s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), }) } @@ -706,20 +664,13 @@ impl RwLock { #[cfg(all(tokio_unstable, feature = "tracing"))] let resource_span = self.resource_span.clone(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(OwnedRwLockReadGuard { + Ok(OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - return Ok(OwnedRwLockReadGuard { - data: self.c.get(), - lock: ManuallyDrop::new(self), - _p: PhantomData, - }); + }) } /// Locks this `RwLock` with exclusive write access, causing the current @@ -778,22 +729,14 @@ impl RwLock { ) }); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLockWriteGuard { + RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: self.resource_span.clone(), - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return RwLockWriteGuard { - permits_acquired: self.mr, - s: &self.s, - data: self.c.get(), - marker: marker::PhantomData, - }; + } } /// Locks this `RwLock` with exclusive write access, causing the current @@ -862,22 +805,14 @@ impl RwLock { #[cfg(all(tokio_unstable, feature = "tracing"))] let resource_span = self.resource_span.clone(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return OwnedRwLockWriteGuard { + OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return OwnedRwLockWriteGuard { - permits_acquired: self.mr, - data: self.c.get(), - lock: ManuallyDrop::new(self), - _p: PhantomData, - }; + } } /// Attempts to acquire this `RwLock` with exclusive write access. @@ -919,22 +854,14 @@ impl RwLock { ) }); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(RwLockWriteGuard { + Ok(RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: self.resource_span.clone(), - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(RwLockWriteGuard { - permits_acquired: self.mr, - s: &self.s, - data: self.c.get(), - marker: marker::PhantomData, - }); + }) } /// Attempts to acquire this `RwLock` with exclusive write access. @@ -986,22 +913,14 @@ impl RwLock { #[cfg(all(tokio_unstable, feature = "tracing"))] let resource_span = self.resource_span.clone(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(OwnedRwLockWriteGuard { + Ok(OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(OwnedRwLockWriteGuard { - permits_acquired: self.mr, - data: self.c.get(), - lock: ManuallyDrop::new(self), - _p: PhantomData, - }); + }) } /// Returns a mutable reference to the underlying data. diff --git a/tokio/src/sync/rwlock/owned_read_guard.rs b/tokio/src/sync/rwlock/owned_read_guard.rs index aebb0b62dbe..27b71bd988b 100644 --- a/tokio/src/sync/rwlock/owned_read_guard.rs +++ b/tokio/src/sync/rwlock/owned_read_guard.rs @@ -63,20 +63,13 @@ impl OwnedRwLockReadGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return OwnedRwLockReadGuard { + OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return OwnedRwLockReadGuard { - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }; + } } /// Attempts to make a new [`OwnedRwLockReadGuard`] for a component of the @@ -124,20 +117,13 @@ impl OwnedRwLockReadGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(OwnedRwLockReadGuard { + Ok(OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(OwnedRwLockReadGuard { - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }); + }) } } diff --git a/tokio/src/sync/rwlock/owned_write_guard.rs b/tokio/src/sync/rwlock/owned_write_guard.rs index 8e8c7a32c10..dbedab4cbb2 100644 --- a/tokio/src/sync/rwlock/owned_write_guard.rs +++ b/tokio/src/sync/rwlock/owned_write_guard.rs @@ -71,22 +71,14 @@ impl OwnedRwLockWriteGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return OwnedRwLockMappedWriteGuard { + OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return OwnedRwLockMappedWriteGuard { - permits_acquired, - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }; + } } /// Attempts to make a new [`OwnedRwLockMappedWriteGuard`] for a component @@ -144,22 +136,14 @@ impl OwnedRwLockWriteGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(OwnedRwLockMappedWriteGuard { + Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(OwnedRwLockMappedWriteGuard { - permits_acquired, - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }); + }) } /// Converts this `OwnedRwLockWriteGuard` into an @@ -237,20 +221,13 @@ impl OwnedRwLockWriteGuard { // NB: Forget to avoid drop impl from being called. mem::forget(self); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return OwnedRwLockReadGuard { + OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return OwnedRwLockReadGuard { - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }; + } } } diff --git a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs index 8a9c04d2ec0..55a24d96ac3 100644 --- a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs @@ -70,22 +70,14 @@ impl OwnedRwLockMappedWriteGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return OwnedRwLockMappedWriteGuard { + OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return OwnedRwLockMappedWriteGuard { - permits_acquired, - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }; + } } /// Attempts to make a new `OwnedRwLockMappedWriteGuard` for a component @@ -140,22 +132,14 @@ impl OwnedRwLockMappedWriteGuard { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(OwnedRwLockMappedWriteGuard { + Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(OwnedRwLockMappedWriteGuard { - permits_acquired, - lock: ManuallyDrop::new(lock), - data, - _p: PhantomData, - }); + }) } } diff --git a/tokio/src/sync/rwlock/read_guard.rs b/tokio/src/sync/rwlock/read_guard.rs index 3f11fb285c5..36921319923 100644 --- a/tokio/src/sync/rwlock/read_guard.rs +++ b/tokio/src/sync/rwlock/read_guard.rs @@ -66,20 +66,13 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLockReadGuard { + RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return RwLockReadGuard { - s, - data, - marker: marker::PhantomData, - }; + } } /// Attempts to make a new [`RwLockReadGuard`] for a component of the @@ -132,20 +125,13 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(RwLockReadGuard { + Ok(RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(RwLockReadGuard { - s, - data, - marker: marker::PhantomData, - }); + }) } } diff --git a/tokio/src/sync/rwlock/write_guard.rs b/tokio/src/sync/rwlock/write_guard.rs index cb6dc5786ea..7cadd74c60d 100644 --- a/tokio/src/sync/rwlock/write_guard.rs +++ b/tokio/src/sync/rwlock/write_guard.rs @@ -72,22 +72,14 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLockMappedWriteGuard { + RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return RwLockMappedWriteGuard { - permits_acquired, - s, - data, - marker: marker::PhantomData, - }; + } } /// Attempts to make a new [`RwLockMappedWriteGuard`] for a component of @@ -147,22 +139,14 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(RwLockMappedWriteGuard { + Ok(RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(RwLockMappedWriteGuard { - permits_acquired, - s, - data, - marker: marker::PhantomData, - }); + }) } /// Converts this `RwLockWriteGuard` into an `RwLockMappedWriteGuard`. This @@ -240,20 +224,13 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { // NB: Forget to avoid drop impl from being called. mem::forget(self); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLockReadGuard { + RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return RwLockReadGuard { - s, - data, - marker: marker::PhantomData, - }; + } } } diff --git a/tokio/src/sync/rwlock/write_guard_mapped.rs b/tokio/src/sync/rwlock/write_guard_mapped.rs index 5b056ff09da..b5c644a9e83 100644 --- a/tokio/src/sync/rwlock/write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/write_guard_mapped.rs @@ -71,22 +71,14 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return RwLockMappedWriteGuard { + RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return RwLockMappedWriteGuard { - permits_acquired, - s, - data, - marker: marker::PhantomData, - }; + } } /// Attempts to make a new [`RwLockMappedWriteGuard`] for a component of @@ -146,22 +138,14 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { // NB: Forget to avoid drop impl from being called. mem::forget(this); - #[cfg(all(tokio_unstable, feature = "tracing"))] - return Ok(RwLockMappedWriteGuard { + Ok(RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span, - }); - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - return Ok(RwLockMappedWriteGuard { - permits_acquired, - s, - data, - marker: marker::PhantomData, - }); + }) } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index fbf557482a4..860f46f3998 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -127,10 +127,10 @@ impl Semaphore { #[track_caller] pub fn new(permits: usize) -> Self { #[cfg(all(tokio_unstable, feature = "tracing"))] - let sem = { + let resource_span = { let location = std::panic::Location::caller(); - let resource_span = tracing::trace_span!( + tracing::trace_span!( "runtime.resource", concrete_type = "Semaphore", kind = "Sync", @@ -138,21 +138,20 @@ impl Semaphore { loc.line = location.line(), loc.col = location.column(), inherits_child_attrs = true, - ); - - let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); - Self { - ll_sem, - resource_span, - } + ) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let sem = Self { - ll_sem: ll::Semaphore::new(permits), - }; + let ll_sem = ll::Semaphore::new(permits); - sem + Self { + ll_sem, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Creates a new semaphore with the initial number of permits. @@ -275,17 +274,18 @@ impl Semaphore { /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { #[cfg(all(tokio_unstable, feature = "tracing"))] - let inner = trace::async_op( + trace::async_op( || self.ll_sem.acquire(n), self.resource_span.clone(), "Semaphore::acquire_many", "poll", true, - ); + ) + .await?; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let inner = self.ll_sem.acquire(n); + self.ll_sem.acquire(n).await?; - inner.await?; Ok(SemaphorePermit { sem: self, permits: n, diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index efe0e90ac2c..7f27ef201f7 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -1,3 +1,5 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::time::driver::ClockTime; use crate::time::driver::{Handle, TimerEntry}; use crate::time::{error::Error, Duration, Instant}; use crate::util::trace; @@ -8,10 +10,6 @@ use std::panic::Location; use std::pin::Pin; use std::task::{self, Poll}; -cfg_trace! { - use crate::time::driver::ClockTime; -} - /// Waits until `deadline` is reached. /// /// No work is performed while awaiting on the sleep future to complete. `Sleep` @@ -387,36 +385,29 @@ impl Sleep { } } - cfg_not_trace! { - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let me = self.project(); - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + // Keep track of task budget + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::coop::poll_proceed(cx), + )); - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } - } + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); - cfg_trace! { - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); - // Keep track of task budget - let coop = ready!(trace_poll_op!( - "poll_elapsed", - crate::coop::poll_proceed(cx), - )); - - let result = me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }); + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); - trace_poll_op!("poll_elapsed", result) - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_elapsed", result); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return result; } } diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index a3dcf35c3b4..439a1a19d54 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -118,35 +118,32 @@ fn internal_interval_at( location: Option<&'static Location<'static>>, ) -> Interval { #[cfg(all(tokio_unstable, feature = "tracing"))] - let interval = { + let resource_span = { let location = location.expect("should have location if tracing"); - let resource_span = tracing::trace_span!( + tracing::trace_span!( "runtime.resource", concrete_type = "Interval", kind = "timer", loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), - ); - - let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); - Interval { - delay, - period, - missed_tick_behavior: Default::default(), - resource_span, - } + ) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let interval = Interval { - delay: Box::pin(sleep_until(start)), + let delay = Box::pin(sleep_until(start)); + + Interval { + delay, period, missed_tick_behavior: Default::default(), - }; - - interval + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Defines the behavior of an [`Interval`] when it misses a tick. From 7de73d6db9c18960b8a302df5fd7eabebe08b14e Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 10 Dec 2021 15:20:40 +0000 Subject: [PATCH 7/9] lint Signed-off-by: Zahari Dichev --- tokio/src/sync/rwlock.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 341730214cb..6991f0e8b6a 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -368,13 +368,13 @@ impl RwLock { T: Sized, { max_reads &= MAX_READS; - return RwLock { + RwLock { mr: max_reads, c: UnsafeCell::new(value), s: Semaphore::const_new(max_reads as usize), #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span::none(), - }; + } } /// Locks this `RwLock` with shared read access, causing the current task From c505bdbdb368310c2d21634f0ce598b422a53712 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 10 Dec 2021 15:34:52 +0000 Subject: [PATCH 8/9] min ver fix Signed-off-by: Zahari Dichev --- tokio/src/util/trace.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 39fe8579070..80ed51026ee 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -69,10 +69,11 @@ cfg_trace! { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); + let poll_op_name = &*this.poll_op_name let _res_enter = this.tracing_ctx.resource_span.enter(); let _async_op_enter = this.tracing_ctx.async_op_span.enter(); let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter(); - trace_poll_op!(this.poll_op_name, this.inner.poll(cx)) + trace_poll_op!(poll_op_name, this.inner.poll(cx)) } } } From 838119ac30ebf8b4deca872a97d669dc543e8089 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 10 Dec 2021 15:35:08 +0000 Subject: [PATCH 9/9] min ver fix Signed-off-by: Zahari Dichev --- tokio/src/util/trace.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 80ed51026ee..6080e2358ae 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -69,7 +69,7 @@ cfg_trace! { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let poll_op_name = &*this.poll_op_name + let poll_op_name = &*this.poll_op_name; let _res_enter = this.tracing_ctx.resource_span.enter(); let _async_op_enter = this.tracing_ctx.async_op_span.enter(); let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter();