diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 3afc0402374..4ab13c2c11c 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -368,7 +368,7 @@ macro_rules! cfg_trace { #[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] $item )* - } + }; } macro_rules! cfg_not_trace { diff --git a/tokio/src/macros/trace.rs b/tokio/src/macros/trace.rs index 31dde2f255a..80a257e1899 100644 --- a/tokio/src/macros/trace.rs +++ b/tokio/src/macros/trace.rs @@ -1,9 +1,8 @@ cfg_trace! { macro_rules! trace_op { - ($name:literal, $readiness:literal, $parent:expr) => { + ($name:expr, $readiness:literal) => { tracing::trace!( target: "runtime::resource::poll_op", - parent: $parent, op_name = $name, is_ready = $readiness ); @@ -11,14 +10,14 @@ cfg_trace! { } macro_rules! trace_poll_op { - ($name:literal, $poll:expr, $parent:expr $(,)*) => { + ($name:expr, $poll:expr $(,)*) => { match $poll { std::task::Poll::Ready(t) => { - trace_op!($name, true, $parent); + trace_op!($name, true); std::task::Poll::Ready(t) } std::task::Poll::Pending => { - trace_op!($name, false, $parent); + trace_op!($name, false); return std::task::Poll::Pending; } } diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 0e39dac8bb5..dfc76a40ebf 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,5 +1,7 @@ use crate::loom::sync::Mutex; use crate::sync::watch; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; /// A barrier enables multiple tasks to synchronize the beginning of some computation. /// @@ -41,6 +43,8 @@ pub struct Barrier { state: Mutex, wait: watch::Receiver, n: usize, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } #[derive(Debug)] @@ -55,6 +59,7 @@ impl Barrier { /// /// A barrier will block `n`-1 tasks which call [`Barrier::wait`] and then wake up all /// tasks at once when the `n`th task calls `wait`. + #[track_caller] pub fn new(mut n: usize) -> Barrier { let (waker, wait) = crate::sync::watch::channel(0); @@ -65,6 +70,32 @@ impl Barrier { n = 1; } + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Barrier", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + size = n, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + arrived = 0, + ) + }); + resource_span + }; + Barrier { state: Mutex::new(BarrierState { waker, @@ -73,6 +104,8 @@ impl Barrier { }), n, wait, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span, } } @@ -85,6 +118,20 @@ impl Barrier { /// [`BarrierWaitResult::is_leader`] when returning from this function, and all other tasks /// will receive a result that will return `false` from `is_leader`. pub async fn wait(&self) -> BarrierWaitResult { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace::async_op( + || self.wait_internal(), + self.resource_span.clone(), + "Barrier::wait", + "poll", + false, + ) + .await; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return self.wait_internal().await; + } + async fn wait_internal(&self) -> BarrierWaitResult { // NOTE: we are taking a _synchronous_ lock here. // It is okay to do so because the critical section is fast and never yields, so it cannot // deadlock even if another future is concurrently holding the lock. @@ -96,7 +143,23 @@ impl Barrier { let mut state = self.state.lock(); let generation = state.generation; state.arrived += 1; + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::state_update", + arrived = 1, + arrived.op = "add", + ); + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::async_op::state_update", + arrived = true, + ); if state.arrived == self.n { + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing::trace!( + target: "runtime::resource::async_op::state_update", + is_leader = true, + ); // we are the leader for this generation // wake everyone, increment the generation, and return state diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index b5c39d2a05d..4f5effff319 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -19,6 +19,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use crate::util::WakeList; use std::future::Future; @@ -35,6 +37,8 @@ pub(crate) struct Semaphore { waiters: Mutex, /// The current number of available permits in the semaphore. permits: AtomicUsize, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } struct Waitlist { @@ -101,6 +105,9 @@ struct Waiter { /// use `UnsafeCell` internally. pointers: linked_list::Pointers, + #[cfg(all(tokio_unstable, feature = "tracing"))] + ctx: trace::AsyncOpTracingCtx, + /// Should not be `Unpin`. _p: PhantomPinned, } @@ -129,12 +136,34 @@ impl Semaphore { "a semaphore may not have more than MAX_PERMITS permits ({})", Self::MAX_PERMITS ); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + is_internal = true + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = permits, + permits.op = "override", + ) + }); + resource_span + }; + Self { permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), waiters: Mutex::new(Waitlist { queue: LinkedList::new(), closed: false, }), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -156,6 +185,8 @@ impl Semaphore { queue: LinkedList::new(), closed: false, }), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -224,7 +255,10 @@ impl Semaphore { let next = curr - num_permits; match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { - Ok(_) => return Ok(()), + Ok(_) => { + // TODO: Instrument once issue has been solved} + return Ok(()); + } Err(actual) => curr = actual, } } @@ -283,6 +317,17 @@ impl Semaphore { rem, Self::MAX_PERMITS ); + + // add remaining permits back + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = rem, + permits.op = "add", + ) + }); + rem = 0; } @@ -347,6 +392,20 @@ impl Semaphore { acquired += acq; if remaining == 0 { if !queued { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ); + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = acquired, + permits.op = "add", + ) + }); + return Ready(Ok(())); } else if lock.is_none() { break self.waiters.lock(); @@ -362,6 +421,15 @@ impl Semaphore { return Ready(Err(AcquireError::closed())); } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + permits = acquired, + permits.op = "sub", + ) + }); + if node.assign_permits(&mut acquired) { self.add_permits_locked(acquired, waiters); return Ready(Ok(())); @@ -406,11 +474,16 @@ impl fmt::Debug for Semaphore { } impl Waiter { - fn new(num_permits: u32) -> Self { + fn new( + num_permits: u32, + #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, + ) -> Self { Waiter { waker: UnsafeCell::new(None), state: AtomicUsize::new(num_permits as usize), pointers: linked_list::Pointers::new(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + ctx, _p: PhantomPinned, } } @@ -426,6 +499,14 @@ impl Waiter { match self.state.compare_exchange(curr, next, AcqRel, Acquire) { Ok(_) => { *n -= assign; + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.ctx.async_op_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = assign, + permits.op = "add", + ); + }); return next == 0; } Err(actual) => curr = actual, @@ -438,12 +519,26 @@ impl Future for Acquire<'_> { type Output = Result<(), AcquireError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // First, ensure the current task has enough budget to proceed. - let coop = ready!(crate::coop::poll_proceed(cx)); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _resource_span = self.node.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _async_op_span = self.node.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered(); let (node, semaphore, needed, queued) = self.project(); - match semaphore.poll_acquire(cx, needed, node, *queued) { + // First, ensure the current task has enough budget to proceed. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_acquire", + crate::coop::poll_proceed(cx), + )); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); + + let result = match semaphore.poll_acquire(cx, needed, node, *queued) { Pending => { *queued = true; Pending @@ -454,18 +549,59 @@ impl Future for Acquire<'_> { *queued = false; Ready(Ok(())) } - } + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_acquire", result); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + return result; } } impl<'a> Acquire<'a> { fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { - Self { + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { node: Waiter::new(num_permits), semaphore, num_permits, queued: false, - } + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + return semaphore.resource_span.in_scope(|| { + let async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new"); + let async_op_poll_span = async_op_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_requested = num_permits, + permits.op = "override", + ); + + tracing::trace!( + target: "runtime::resource::async_op::state_update", + permits_obtained = 0 as usize, + permits.op = "override", + ); + + tracing::trace_span!("runtime.resource.async_op.poll") + }); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: semaphore.resource_span.clone(), + }; + + Self { + node: Waiter::new(num_permits, ctx), + semaphore, + num_permits, + queued: false, + } + }); } fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 4d9f9886d76..2476726bdea 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -1,6 +1,8 @@ #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] use crate::sync::batch_semaphore as semaphore; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::cell::UnsafeCell; use std::error::Error; @@ -124,6 +126,8 @@ use std::{fmt, marker, mem}; /// [`Send`]: trait@std::marker::Send /// [`lock`]: method@Mutex::lock pub struct Mutex { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, s: semaphore::Semaphore, c: UnsafeCell, } @@ -138,6 +142,8 @@ pub struct Mutex { /// The lock is automatically released whenever the guard is dropped, at which /// point `lock` will succeed yet again. pub struct MutexGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: &'a Mutex, } @@ -157,6 +163,8 @@ pub struct MutexGuard<'a, T: ?Sized> { /// /// [`Arc`]: std::sync::Arc pub struct OwnedMutexGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, lock: Arc>, } @@ -242,13 +250,42 @@ impl Mutex { /// /// let lock = Mutex::new(5); /// ``` + #[track_caller] pub fn new(t: T) -> Self where T: Sized, { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Mutex", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + semaphore::Semaphore::new(1) + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let s = semaphore::Semaphore::new(1); + Self { c: UnsafeCell::new(t), - s: semaphore::Semaphore::new(1), + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -270,6 +307,8 @@ impl Mutex { Self { c: UnsafeCell::new(t), s: semaphore::Semaphore::const_new(1), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -297,8 +336,32 @@ impl Mutex { /// } /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] self.acquire().await; - MutexGuard { lock: self } + + MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + } } /// Blocking lock this mutex. When the lock has been acquired, function returns a @@ -368,8 +431,35 @@ impl Mutex { /// /// [`Arc`]: std::sync::Arc pub async fn lock_owned(self: Arc) -> OwnedMutexGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.acquire(), + self.resource_span.clone(), + "Mutex::lock_owned", + "poll", + false, + ) + .await; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] self.acquire().await; - OwnedMutexGuard { lock: self } + + OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } async fn acquire(&self) { @@ -399,7 +489,21 @@ impl Mutex { /// ``` pub fn try_lock(&self) -> Result, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(MutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + Ok(MutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + }) + } Err(_) => Err(TryLockError(())), } } @@ -454,7 +558,24 @@ impl Mutex { /// # } pub fn try_lock_owned(self: Arc) -> Result, TryLockError> { match self.s.try_acquire(1) { - Ok(_) => Ok(OwnedMutexGuard { lock: self }), + Ok(_) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = true, + ); + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + + Ok(OwnedMutexGuard { + lock: self, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + }) + } Err(_) => Err(TryLockError(())), } } @@ -637,7 +758,14 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - self.lock.s.release(1) + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); + self.lock.s.release(1); } } @@ -699,6 +827,13 @@ impl OwnedMutexGuard { impl Drop for OwnedMutexGuard { fn drop(&mut self) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + locked = false, + ); + }); self.lock.s.release(1) } } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 08a2d9e49d7..cfc92259d39 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -122,6 +122,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::fmt; use std::future::Future; @@ -215,6 +217,8 @@ use std::task::{Context, Poll, Waker}; #[derive(Debug)] pub struct Sender { inner: Option>>, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } /// Receives a value from the associated [`Sender`]. @@ -305,6 +309,12 @@ pub struct Sender { #[derive(Debug)] pub struct Receiver { inner: Option>>, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_span: tracing::Span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_poll_span: tracing::Span, } pub mod error { @@ -442,7 +452,56 @@ struct State(usize); /// } /// } /// ``` +#[track_caller] pub fn channel() -> (Sender, Receiver) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sender|Receiver", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_dropped = false, + tx_dropped.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = false, + rx_dropped.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_sent = false, + value_sent.op = "override", + ) + }); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_received = false, + value_received.op = "override", + ) + }); + + resource_span + }; + let inner = Arc::new(Inner { state: AtomicUsize::new(State::new().as_usize()), value: UnsafeCell::new(None), @@ -452,8 +511,27 @@ pub fn channel() -> (Sender, Receiver) { let tx = Sender { inner: Some(inner.clone()), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span.clone(), + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let async_op_span = resource_span + .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await")); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let rx = Receiver { + inner: Some(inner), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: resource_span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_span, + #[cfg(all(tokio_unstable, feature = "tracing"))] + async_op_poll_span, }; - let rx = Receiver { inner: Some(inner) }; (tx, rx) } @@ -525,6 +603,15 @@ impl Sender { } } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_sent = true, + value_sent.op = "override", + ) + }); + Ok(()) } @@ -598,7 +685,20 @@ impl Sender { pub async fn closed(&mut self) { use crate::future::poll_fn; - poll_fn(|cx| self.poll_closed(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let closed = trace::async_op( + || poll_fn(|cx| self.poll_closed(cx)), + resource_span, + "Sender::closed", + "poll_closed", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let closed = poll_fn(|cx| self.poll_closed(cx)); + + closed.await } /// Returns `true` if the associated [`Receiver`] handle has been dropped. @@ -728,6 +828,14 @@ impl Drop for Sender { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.complete(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_dropped = true, + tx_dropped.op = "override", + ) + }); } } } @@ -795,6 +903,14 @@ impl Receiver { pub fn close(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.close(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = true, + rx_dropped.op = "override", + ) + }); } } @@ -872,7 +988,17 @@ impl Receiver { // `UnsafeCell`. Therefore, it is now safe for us to access the // cell. match unsafe { inner.consume_value() } { - Some(value) => Ok(value), + Some(value) => { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + value_received = true, + value_received.op = "override", + ) + }); + Ok(value) + } None => Err(TryRecvError::Closed), } } else if state.is_closed() { @@ -894,6 +1020,14 @@ impl Drop for Receiver { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { inner.close(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = true, + rx_dropped.op = "override", + ) + }); } } } @@ -903,8 +1037,21 @@ impl Future for Receiver { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If `inner` is `None`, then `poll()` has already completed. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _res_span = self.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.async_op_poll_span.clone().entered(); + let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { - ready!(inner.poll_recv(cx))? + #[cfg(all(tokio_unstable, feature = "tracing"))] + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let res = ready!(inner.poll_recv(cx))?; + + res } else { panic!("called after complete"); }; diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 120bc72b848..6991f0e8b6a 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,5 +1,7 @@ use crate::sync::batch_semaphore::{Semaphore, TryAcquireError}; use crate::sync::mutex::TryLockError; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::cell::UnsafeCell; use std::marker; use std::marker::PhantomData; @@ -86,6 +88,9 @@ const MAX_READS: u32 = 10; /// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies #[derive(Debug)] pub struct RwLock { + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, + // maximum number of concurrent readers mr: u32, @@ -197,14 +202,55 @@ impl RwLock { /// /// let lock = RwLock::new(5); /// ``` + #[track_caller] pub fn new(value: T) -> RwLock where T: Sized, { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "RwLock", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + max_readers = MAX_READS, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 0, + ); + }); + + resource_span + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| Semaphore::new(MAX_READS as usize)); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let s = Semaphore::new(MAX_READS as usize); + RwLock { mr: MAX_READS, c: UnsafeCell::new(value), - s: Semaphore::new(MAX_READS as usize), + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -222,6 +268,7 @@ impl RwLock { /// # Panics /// /// Panics if `max_reads` is more than `u32::MAX >> 3`. + #[track_caller] pub fn with_max_readers(value: T, max_reads: u32) -> RwLock where T: Sized, @@ -231,10 +278,52 @@ impl RwLock { "a RwLock may not be created with more than {} readers", MAX_READS ); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "RwLock", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + max_readers = max_reads, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + ); + + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 0, + ); + }); + + resource_span + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let s = resource_span.in_scope(|| Semaphore::new(max_reads as usize)); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let s = Semaphore::new(max_reads as usize); + RwLock { mr: max_reads, c: UnsafeCell::new(value), - s: Semaphore::new(max_reads as usize), + s, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -257,6 +346,8 @@ impl RwLock { mr: MAX_READS, c: UnsafeCell::new(value), s: Semaphore::const_new(MAX_READS as usize), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -281,6 +372,8 @@ impl RwLock { mr: max_reads, c: UnsafeCell::new(value), s: Semaphore::const_new(max_reads as usize), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span::none(), } } @@ -330,15 +423,39 @@ impl RwLock { ///} /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - self.s.acquire(1).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(1), + self.resource_span.clone(), + "RwLock::read", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(1); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + RwLockReadGuard { s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), } } @@ -394,15 +511,42 @@ impl RwLock { ///} /// ``` pub async fn read_owned(self: Arc) -> OwnedRwLockReadGuard { - self.s.acquire(1).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(1), + self.resource_span.clone(), + "RwLock::read_owned", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(1); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -445,10 +589,21 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + Ok(RwLockReadGuard { s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), }) } @@ -497,10 +652,24 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + Ok(OwnedRwLockReadGuard { data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } @@ -533,16 +702,40 @@ impl RwLock { ///} /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(self.mr), + self.resource_span.clone(), + "RwLock::write", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(self.mr); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), } } @@ -582,16 +775,43 @@ impl RwLock { ///} /// ``` pub async fn write_owned(self: Arc) -> OwnedRwLockWriteGuard { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.s.acquire(self.mr), + self.resource_span.clone(), + "RwLock::write_owned", + "poll", + false, + ); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.s.acquire(self.mr); + + inner.await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -625,11 +845,22 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + Ok(RwLockWriteGuard { permits_acquired: self.mr, s: &self.s, data: self.c.get(), marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), }) } @@ -670,11 +901,25 @@ impl RwLock { Err(TryAcquireError::Closed) => unreachable!(), } + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = true, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + Ok(OwnedRwLockWriteGuard { permits_acquired: self.mr, data: self.c.get(), lock: ManuallyDrop::new(self), _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } diff --git a/tokio/src/sync/rwlock/owned_read_guard.rs b/tokio/src/sync/rwlock/owned_read_guard.rs index 1881295846e..27b71bd988b 100644 --- a/tokio/src/sync/rwlock/owned_read_guard.rs +++ b/tokio/src/sync/rwlock/owned_read_guard.rs @@ -15,6 +15,8 @@ use std::sync::Arc; /// [`read_owned`]: method@crate::sync::RwLock::read_owned /// [`RwLock`]: struct@crate::sync::RwLock pub struct OwnedRwLockReadGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, pub(super) data: *const U, @@ -56,12 +58,17 @@ impl OwnedRwLockReadGuard { { let data = f(&*this) as *const V; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -105,12 +112,17 @@ impl OwnedRwLockReadGuard { None => return Err(this), }; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + Ok(OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } } @@ -145,5 +157,14 @@ impl Drop for OwnedRwLockReadGuard { fn drop(&mut self) { self.lock.s.release(1); unsafe { ManuallyDrop::drop(&mut self.lock) }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "sub", + ) + }); } } diff --git a/tokio/src/sync/rwlock/owned_write_guard.rs b/tokio/src/sync/rwlock/owned_write_guard.rs index 0a78d28e903..dbedab4cbb2 100644 --- a/tokio/src/sync/rwlock/owned_write_guard.rs +++ b/tokio/src/sync/rwlock/owned_write_guard.rs @@ -16,6 +16,8 @@ use std::sync::Arc; /// [`write_owned`]: method@crate::sync::RwLock::write_owned /// [`RwLock`]: struct@crate::sync::RwLock pub struct OwnedRwLockWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, @@ -64,13 +66,18 @@ impl OwnedRwLockWriteGuard { let data = f(&mut *this) as *mut U; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -123,13 +130,19 @@ impl OwnedRwLockWriteGuard { }; let permits_acquired = this.permits_acquired; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); + // NB: Forget to avoid drop impl from being called. mem::forget(this); + Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } @@ -181,15 +194,39 @@ impl OwnedRwLockWriteGuard { pub fn downgrade(mut self) -> OwnedRwLockReadGuard { let lock = unsafe { ManuallyDrop::take(&mut self.lock) }; let data = self.data; + let to_release = (self.permits_acquired - 1) as usize; // Release all but one of the permits held by the write guard - lock.s.release((self.permits_acquired - 1) as usize); + lock.s.release(to_release); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(self); + OwnedRwLockReadGuard { lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } } @@ -229,6 +266,14 @@ where impl Drop for OwnedRwLockWriteGuard { fn drop(&mut self) { self.lock.s.release(self.permits_acquired as usize); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); unsafe { ManuallyDrop::drop(&mut self.lock) }; } } diff --git a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs index d88ee01e1fd..55a24d96ac3 100644 --- a/tokio/src/sync/rwlock/owned_write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/owned_write_guard_mapped.rs @@ -15,6 +15,8 @@ use std::sync::Arc; /// [mapping]: method@crate::sync::OwnedRwLockWriteGuard::map /// [`OwnedRwLockWriteGuard`]: struct@crate::sync::OwnedRwLockWriteGuard pub struct OwnedRwLockMappedWriteGuard { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, // ManuallyDrop allows us to destructure into this field without running the destructor. pub(super) lock: ManuallyDrop>>, @@ -63,13 +65,18 @@ impl OwnedRwLockMappedWriteGuard { let data = f(&mut *this) as *mut V; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -120,13 +127,18 @@ impl OwnedRwLockMappedWriteGuard { }; let lock = unsafe { ManuallyDrop::take(&mut this.lock) }; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + Ok(OwnedRwLockMappedWriteGuard { permits_acquired, lock: ManuallyDrop::new(lock), data, _p: PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } } @@ -166,6 +178,14 @@ where impl Drop for OwnedRwLockMappedWriteGuard { fn drop(&mut self) { self.lock.s.release(self.permits_acquired as usize); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); unsafe { ManuallyDrop::drop(&mut self.lock) }; } } diff --git a/tokio/src/sync/rwlock/read_guard.rs b/tokio/src/sync/rwlock/read_guard.rs index 090b297e4af..36921319923 100644 --- a/tokio/src/sync/rwlock/read_guard.rs +++ b/tokio/src/sync/rwlock/read_guard.rs @@ -13,6 +13,8 @@ use std::ops; /// [`read`]: method@crate::sync::RwLock::read /// [`RwLock`]: struct@crate::sync::RwLock pub struct RwLockReadGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) s: &'a Semaphore, pub(super) data: *const T, pub(super) marker: marker::PhantomData<&'a T>, @@ -59,12 +61,17 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { { let data = f(&*this) as *const U; let s = this.s; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -113,12 +120,17 @@ impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { None => return Err(this), }; let s = this.s; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + Ok(RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } } @@ -152,5 +164,14 @@ where impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> { fn drop(&mut self) { self.s.release(1); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "sub", + ) + }); } } diff --git a/tokio/src/sync/rwlock/write_guard.rs b/tokio/src/sync/rwlock/write_guard.rs index 8c80ee70db4..7cadd74c60d 100644 --- a/tokio/src/sync/rwlock/write_guard.rs +++ b/tokio/src/sync/rwlock/write_guard.rs @@ -15,6 +15,8 @@ use std::ops; /// [`write`]: method@crate::sync::RwLock::write /// [`RwLock`]: struct@crate::sync::RwLock pub struct RwLockWriteGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, pub(super) s: &'a Semaphore, pub(super) data: *mut T, @@ -66,6 +68,8 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { let data = f(&mut *this) as *mut U; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); RwLockMappedWriteGuard { @@ -73,6 +77,8 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -129,6 +135,8 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { }; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); Ok(RwLockMappedWriteGuard { @@ -136,6 +144,8 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } @@ -188,15 +198,38 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { /// [`RwLock`]: struct@crate::sync::RwLock pub fn downgrade(self) -> RwLockReadGuard<'a, T> { let RwLockWriteGuard { s, data, .. } = self; - + let to_release = (self.permits_acquired - 1) as usize; // Release all but one of the permits held by the write guard - s.release((self.permits_acquired - 1) as usize); + s.release(to_release); + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + current_readers = 1, + current_readers.op = "add", + ) + }); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(self); + RwLockReadGuard { s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } } @@ -236,5 +269,14 @@ where impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> { fn drop(&mut self) { self.s.release(self.permits_acquired as usize); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); } } diff --git a/tokio/src/sync/rwlock/write_guard_mapped.rs b/tokio/src/sync/rwlock/write_guard_mapped.rs index 3cf69de4bdd..b5c644a9e83 100644 --- a/tokio/src/sync/rwlock/write_guard_mapped.rs +++ b/tokio/src/sync/rwlock/write_guard_mapped.rs @@ -14,6 +14,8 @@ use std::ops; /// [mapping]: method@crate::sync::RwLockWriteGuard::map /// [`RwLockWriteGuard`]: struct@crate::sync::RwLockWriteGuard pub struct RwLockMappedWriteGuard<'a, T: ?Sized> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) resource_span: tracing::Span, pub(super) permits_acquired: u32, pub(super) s: &'a Semaphore, pub(super) data: *mut T, @@ -64,13 +66,18 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { let data = f(&mut *this) as *mut U; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -126,13 +133,18 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { }; let s = this.s; let permits_acquired = this.permits_acquired; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = this.resource_span.clone(); // NB: Forget to avoid drop impl from being called. mem::forget(this); + Ok(RwLockMappedWriteGuard { permits_acquired, s, data, marker: marker::PhantomData, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, }) } } @@ -172,5 +184,14 @@ where impl<'a, T: ?Sized> Drop for RwLockMappedWriteGuard<'a, T> { fn drop(&mut self) { self.s.release(self.permits_acquired as usize); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + write_locked = false, + write_locked.op = "override", + ) + }); } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 839b523c4ce..860f46f3998 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,5 +1,7 @@ use super::batch_semaphore as ll; // low level implementation use super::{AcquireError, TryAcquireError}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; use std::sync::Arc; /// Counting semaphore performing asynchronous permit acquisition. @@ -77,6 +79,8 @@ use std::sync::Arc; pub struct Semaphore { /// The low level semaphore ll_sem: ll::Semaphore, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } /// A permit from the semaphore. @@ -120,9 +124,33 @@ fn bounds() { impl Semaphore { /// Creates a new semaphore with the initial number of permits. + #[track_caller] pub fn new(permits: usize) -> Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Semaphore", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + inherits_child_attrs = true, + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let ll_sem = ll::Semaphore::new(permits); + Self { - ll_sem: ll::Semaphore::new(permits), + ll_sem, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -139,9 +167,16 @@ impl Semaphore { #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new(permits: usize) -> Self { - Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Self { ll_sem: ll::Semaphore::const_new(permits), - } + resource_span: tracing::Span::none(), + }; + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return Self { + ll_sem: ll::Semaphore::const_new(permits), + }; } /// Returns the current number of available permits. @@ -191,7 +226,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire(&self) -> Result, AcquireError> { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(SemaphorePermit { sem: self, permits: 1, @@ -227,7 +273,19 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many", + "poll", + true, + ) + .await?; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] self.ll_sem.acquire(n).await?; + Ok(SemaphorePermit { sem: self, permits: n, @@ -350,7 +408,18 @@ impl Semaphore { /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub async fn acquire_owned(self: Arc) -> Result { - self.ll_sem.acquire(1).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(1), + self.resource_span.clone(), + "Semaphore::acquire_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(1); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: 1, @@ -403,7 +472,18 @@ impl Semaphore { self: Arc, n: u32, ) -> Result { - self.ll_sem.acquire(n).await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = trace::async_op( + || self.ll_sem.acquire(n), + self.resource_span.clone(), + "Semaphore::acquire_many_owned", + "poll", + true, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = self.ll_sem.acquire(n); + + inner.await?; Ok(OwnedSemaphorePermit { sem: self, permits: n, diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index d10639d191c..7f27ef201f7 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -1,3 +1,5 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::time::driver::ClockTime; use crate::time::driver::{Handle, TimerEntry}; use crate::time::{error::Error, Duration, Instant}; use crate::util::trace; @@ -8,10 +10,6 @@ use std::panic::Location; use std::pin::Pin; use std::task::{self, Poll}; -cfg_trace! { - use crate::time::driver::ClockTime; -} - /// Waits until `deadline` is reached. /// /// No work is performed while awaiting on the sleep future to complete. `Sleep` @@ -238,8 +236,7 @@ cfg_trace! { #[derive(Debug)] struct Inner { deadline: Instant, - resource_span: tracing::Span, - async_op_span: tracing::Span, + ctx: trace::AsyncOpTracingCtx, time_source: ClockTime, } } @@ -266,8 +263,7 @@ impl Sleep { let deadline_tick = time_source.deadline_to_tick(deadline); let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); - let location = location.expect("should have location if tracking caller"); - + let location = location.expect("should have location if tracing"); let resource_span = tracing::trace_span!( "runtime.resource", concrete_type = "Sleep", @@ -277,21 +273,29 @@ impl Sleep { loc.col = location.column(), ); - let async_op_span = - tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout"); + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); - tracing::trace!( - target: "runtime::resource::state_update", - parent: resource_span.id(), - duration = duration, - duration.unit = "ms", - duration.op = "override", - ); + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, + }; Inner { deadline, - resource_span, - async_op_span, + ctx, time_source, } }; @@ -358,54 +362,52 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] { - me.inner.async_op_span = + let _resource_enter = me.inner.ctx.resource_span.enter(); + me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + let _async_op_enter = me.inner.ctx.async_op_span.enter(); + + me.inner.ctx.async_op_poll_span = + tracing::trace_span!("runtime.resource.async_op.poll"); + + let duration = { + let now = me.inner.time_source.now(); + let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + deadline_tick.checked_sub(now).unwrap_or(0) + }; tracing::trace!( target: "runtime::resource::state_update", - parent: me.inner.resource_span.id(), - duration = { - let now = me.inner.time_source.now(); - let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); - deadline_tick.checked_sub(now).unwrap_or(0) - }, + duration = duration, duration.unit = "ms", duration.op = "override", ); } } - cfg_not_trace! { - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let me = self.project(); - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + // Keep track of task budget + #[cfg(all(tokio_unstable, feature = "tracing"))] + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::coop::poll_proceed(cx), + )); - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } - } + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + let coop = ready!(crate::coop::poll_proceed(cx)); - cfg_trace! { - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); - // Keep track of task budget - let coop = ready!(trace_poll_op!( - "poll_elapsed", - crate::coop::poll_proceed(cx), - me.inner.resource_span.id(), - )); - - let result = me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }); + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); - trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id()) - } + #[cfg(all(tokio_unstable, feature = "tracing"))] + return trace_poll_op!("poll_elapsed", result); + + #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] + return result; } } @@ -423,8 +425,11 @@ impl Future for Sleep { // really do much better if we passed the error onwards. fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { #[cfg(all(tokio_unstable, feature = "tracing"))] - let _span = self.inner.async_op_span.clone().entered(); - + let _res_span = self.inner.ctx.resource_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_span = self.inner.ctx.async_op_span.clone().entered(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); match ready!(self.as_mut().poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 6a89943741e..439a1a19d54 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -1,6 +1,8 @@ use crate::future::poll_fn; use crate::time::{sleep_until, Duration, Instant, Sleep}; +use crate::util::trace; +use std::panic::Location; use std::pin::Pin; use std::task::{Context, Poll}; use std::{convert::TryInto, future::Future}; @@ -68,10 +70,10 @@ use std::{convert::TryInto, future::Future}; /// /// [`sleep`]: crate::time::sleep() /// [`.tick().await`]: Interval::tick +#[track_caller] pub fn interval(period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); - - interval_at(Instant::now(), period) + internal_interval_at(Instant::now(), period, trace::caller_location()) } /// Creates new [`Interval`] that yields with interval of `period` with the @@ -103,13 +105,44 @@ pub fn interval(period: Duration) -> Interval { /// // approximately 70ms have elapsed. /// } /// ``` +#[track_caller] pub fn interval_at(start: Instant, period: Duration) -> Interval { assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + internal_interval_at(start, period, trace::caller_location()) +} + +#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] +fn internal_interval_at( + start: Instant, + period: Duration, + location: Option<&'static Location<'static>>, +) -> Interval { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = location.expect("should have location if tracing"); + + tracing::trace_span!( + "runtime.resource", + concrete_type = "Interval", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let delay = resource_span.in_scope(|| Box::pin(sleep_until(start))); + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let delay = Box::pin(sleep_until(start)); Interval { - delay: Box::pin(sleep_until(start)), + delay, period, missed_tick_behavior: Default::default(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, } } @@ -362,6 +395,9 @@ pub struct Interval { /// The strategy `Interval` should use when a tick is missed. missed_tick_behavior: MissedTickBehavior, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } impl Interval { @@ -391,7 +427,20 @@ impl Interval { /// } /// ``` pub async fn tick(&mut self) -> Instant { - poll_fn(|cx| self.poll_tick(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = self.resource_span.clone(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let instant = trace::async_op( + || poll_fn(|cx| self.poll_tick(cx)), + resource_span, + "Interval::tick", + "poll_tick", + false, + ); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let instant = poll_fn(|cx| self.poll_tick(cx)); + + instant.await } /// Polls for the next instant in the interval to be reached. diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 74ae739354b..6080e2358ae 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,5 +1,11 @@ cfg_trace! { cfg_rt! { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use pin_project_lite::pin_project; + use std::future::Future; pub(crate) use tracing::instrument::Instrumented; #[inline] @@ -18,6 +24,58 @@ cfg_trace! { ); task.instrument(span) } + + pub(crate) fn async_op(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp + where P: FnOnce() -> F { + resource_span.in_scope(|| { + let async_op_span = tracing::trace_span!("runtime.resource.async_op", source = source, inherits_child_attrs = inherits_child_attrs); + let enter = async_op_span.enter(); + let async_op_poll_span = tracing::trace_span!("runtime.resource.async_op.poll"); + let inner = inner(); + drop(enter); + let tracing_ctx = AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: resource_span.clone(), + }; + InstrumentedAsyncOp { + inner, + tracing_ctx, + poll_op_name, + } + }) + } + + #[derive(Debug, Clone)] + pub(crate) struct AsyncOpTracingCtx { + pub(crate) async_op_span: tracing::Span, + pub(crate) async_op_poll_span: tracing::Span, + pub(crate) resource_span: tracing::Span, + } + + + pin_project! { + #[derive(Debug, Clone)] + pub(crate) struct InstrumentedAsyncOp { + #[pin] + pub(crate) inner: F, + pub(crate) tracing_ctx: AsyncOpTracingCtx, + pub(crate) poll_op_name: &'static str + } + } + + impl Future for InstrumentedAsyncOp { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let poll_op_name = &*this.poll_op_name; + let _res_enter = this.tracing_ctx.resource_span.enter(); + let _async_op_enter = this.tracing_ctx.async_op_span.enter(); + let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter(); + trace_poll_op!(poll_op_name, this.inner.poll(cx)) + } + } } } cfg_time! {