From e149577ba58c89fe0894e6d3af1c94ffab10f514 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 6 Jul 2021 12:50:23 +0000 Subject: [PATCH 1/6] tracing: add resource instrumentation declarative macros Signed-off-by: Zahari Dichev --- tokio/src/fs/file.rs | 144 +++++++++++++++++---------------- tokio/src/macros/instrument.rs | 132 ++++++++++++++++++++++++++++++ tokio/src/macros/mod.rs | 3 + tokio/src/net/tcp/stream.rs | 138 ++++++++++++++++--------------- tokio/src/time/driver/sleep.rs | 26 +++--- 5 files changed, 295 insertions(+), 148 deletions(-) create mode 100644 tokio/src/macros/instrument.rs diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5c06e732b09..ec83b9eaeb8 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -19,67 +19,69 @@ use std::task::Context; use std::task::Poll; use std::task::Poll::*; -/// A reference to an open file on the filesystem. -/// -/// This is a specialized version of [`std::fs::File`][std] for usage from the -/// Tokio runtime. -/// -/// An instance of a `File` can be read and/or written depending on what options -/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical -/// cursor that the file contains internally. -/// -/// A file will not be closed immediately when it goes out of scope if there -/// are any IO operations that have not yet completed. To ensure that a file is -/// closed immediately when it is dropped, you should call [`flush`] before -/// dropping it. Note that this does not ensure that the file has been fully -/// written to disk; the operating system might keep the changes around in an -/// in-memory buffer. See the [`sync_all`] method for telling the OS to write -/// the data to disk. -/// -/// Reading and writing to a `File` is usually done using the convenience -/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. -/// -/// [std]: struct@std::fs::File -/// [`AsyncSeek`]: trait@crate::io::AsyncSeek -/// [`flush`]: fn@crate::io::AsyncWriteExt::flush -/// [`sync_all`]: fn@crate::fs::File::sync_all -/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt -/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt -/// -/// # Examples -/// -/// Create a new file and asynchronously write bytes to it: -/// -/// ```no_run -/// use tokio::fs::File; -/// use tokio::io::AsyncWriteExt; // for write_all() -/// -/// # async fn dox() -> std::io::Result<()> { -/// let mut file = File::create("foo.txt").await?; -/// file.write_all(b"hello, world!").await?; -/// # Ok(()) -/// # } -/// ``` -/// -/// Read the contents of a file into a buffer -/// -/// ```no_run -/// use tokio::fs::File; -/// use tokio::io::AsyncReadExt; // for read_to_end() -/// -/// # async fn dox() -> std::io::Result<()> { -/// let mut file = File::open("foo.txt").await?; -/// -/// let mut contents = vec![]; -/// file.read_to_end(&mut contents).await?; -/// -/// println!("len = {}", contents.len()); -/// # Ok(()) -/// # } -/// ``` -pub struct File { - std: Arc, - inner: Mutex, +instrument_resource! { + /// A reference to an open file on the filesystem. + /// + /// This is a specialized version of [`std::fs::File`][std] for usage from the + /// Tokio runtime. + /// + /// An instance of a `File` can be read and/or written depending on what options + /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical + /// cursor that the file contains internally. + /// + /// A file will not be closed immediately when it goes out of scope if there + /// are any IO operations that have not yet completed. To ensure that a file is + /// closed immediately when it is dropped, you should call [`flush`] before + /// dropping it. Note that this does not ensure that the file has been fully + /// written to disk; the operating system might keep the changes around in an + /// in-memory buffer. See the [`sync_all`] method for telling the OS to write + /// the data to disk. + /// + /// Reading and writing to a `File` is usually done using the convenience + /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. + /// + /// [std]: struct@std::fs::File + /// [`AsyncSeek`]: trait@crate::io::AsyncSeek + /// [`flush`]: fn@crate::io::AsyncWriteExt::flush + /// [`sync_all`]: fn@crate::fs::File::sync_all + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// # Examples + /// + /// Create a new file and asynchronously write bytes to it: + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; // for write_all() + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// Read the contents of a file into a buffer + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncReadExt; // for read_to_end() + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// + /// let mut contents = vec![]; + /// file.read_to_end(&mut contents).await?; + /// + /// println!("len = {}", contents.len()); + /// # Ok(()) + /// # } + /// ``` + pub struct File { + std: Arc, + inner: Mutex, + } } struct Inner { @@ -200,13 +202,19 @@ impl File { /// let file = tokio::fs::File::from_std(std_file); /// ``` pub fn from_std(std: sys::File) -> File { - File { - std: Arc::new(std), - inner: Mutex::new(Inner { - state: State::Idle(Some(Buf::with_capacity(0))), - last_write_err: None, - pos: 0, - }), + let std = Arc::new(std); + let inner = Mutex::new(Inner { + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + pos: 0, + }); + + new_instrumented_resource! { + FileSystem, + File { + std, + inner, + } } } diff --git a/tokio/src/macros/instrument.rs b/tokio/src/macros/instrument.rs new file mode 100644 index 00000000000..f332fc951f4 --- /dev/null +++ b/tokio/src/macros/instrument.rs @@ -0,0 +1,132 @@ +cfg_trace! { + #[macro_export] + #[doc(hidden)] + macro_rules! instrument_resource { + ( + pin_project, + $(#[$meta:meta])* + // pin project gets confused when this is a `vis` + // and does not infer the projection visibility correctly + $visibility:ident struct $struct_name:ident { + $( + $(#[$field_attrs:ident])* + $field_vis:vis $field_name:ident : $field_type:ty + ),*$(,)+ + } + ) => { + pin_project_lite::pin_project! { + $(#[$meta])* + $visibility struct $struct_name { + __resource_span: tracing::Span, + $( + $(#[$field_attrs])* + $field_vis $field_name : $field_type, + )* + } + } + }; + ( + $(#[$meta:meta])* + $vis:vis struct $struct_name:ident { + $( + $(#[$field_attrs:ident])* + $field_vis:vis $field_name:ident : $field_type:ty + ),*$(,)+ + } + ) => { + $(#[$meta])* + $vis struct $struct_name { + __resource_span: tracing::Span, + $( + $(#[$field_attrs])* + $field_vis $field_name : $field_type, + )* + } + } + } + + + #[macro_export] + #[doc(hidden)] + macro_rules! new_instrumented_resource { + ( + $resource_type:ident, + $struct:ident { + $($field:ident),* $(,)* // Handle non shorthand initialization + } + ) => { + $struct { + __resource_span:tracing::trace_span!( + "resource", + concrete_type = stringify!($struct), + kind = stringify!($resource_type) + ), + $( + $field, + )* + } + }; + } + + #[macro_export] + #[doc(hidden)] + macro_rules! instrument_resource_op { + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident(&$self: ident, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name(&$self, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let __resource_span = $self.__resource_span.clone(); + let __resource_span_guard = __resource_span.enter(); + let result = (|| $body)(); + drop(__resource_span_guard); + result + } + }; + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident($self:tt : $self_type:ty, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name($self : $self_type, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let __resource_span = $self.__resource_span.clone(); + let __resource_span_guard = __resource_span.enter(); + let result = (|| $body)(); + drop(__resource_span_guard); + result + } + }; + } +} + +cfg_not_trace! { + #[macro_export] + #[doc(hidden)] + macro_rules! instrument_resource { + (pin_project, $($t:tt)*) => { + pin_project_lite::pin_project! { + $($t)* + } + }; + ($($t:tt)*) => { + $($t)* + } + } + + #[macro_export] + #[doc(hidden)] + macro_rules! new_instrumented_resource { + ($resource_type:ident, $($t:tt)*) => { + $($t)* + } + } + + #[macro_export] + #[doc(hidden)] + macro_rules! instrument_resource_op { + ($($t:tt)*) => { + $($t)* + } + } +} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index b0af5215256..3b4403aaf55 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -15,6 +15,9 @@ mod ready; #[macro_use] mod thread_local; +#[macro_use] +mod instrument; + #[macro_use] #[cfg(feature = "rt")] pub(crate) mod scoped_tls; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0277a360d09..654ebea8137 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -15,57 +15,58 @@ use std::time::Duration; cfg_io_util! { use bytes::BufMut; } - cfg_net! { - /// A TCP stream between a local and a remote socket. - /// - /// A TCP stream can either be created by connecting to an endpoint, via the - /// [`connect`] method, or by [accepting] a connection from a [listener]. A - /// TCP stream can also be created via the [`TcpSocket`] type. - /// - /// Reading and writing to a `TcpStream` is usually done using the - /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] - /// traits. - /// - /// [`connect`]: method@TcpStream::connect - /// [accepting]: method@crate::net::TcpListener::accept - /// [listener]: struct@crate::net::TcpListener - /// [`TcpSocket`]: struct@crate::net::TcpSocket - /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt - /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// use tokio::io::AsyncWriteExt; - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// // Connect to a peer - /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// // Write some data. - /// stream.write_all(b"hello world!").await?; - /// - /// Ok(()) - /// } - /// ``` - /// - /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. - /// - /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all - /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - /// - /// To shut down the stream in the write direction, you can call the - /// [`shutdown()`] method. This will cause the other peer to receive a read of - /// length 0, indicating that no more data will be sent. This only closes - /// the stream in one direction. - /// - /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown - pub struct TcpStream { - io: PollEvented, + instrument_resource! { + /// A TCP stream between a local and a remote socket. + /// + /// A TCP stream can either be created by connecting to an endpoint, via the + /// [`connect`] method, or by [accepting] a connection from a [listener]. A + /// TCP stream can also be created via the [`TcpSocket`] type. + /// + /// Reading and writing to a `TcpStream` is usually done using the + /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] + /// traits. + /// + /// [`connect`]: method@TcpStream::connect + /// [accepting]: method@crate::net::TcpListener::accept + /// [listener]: struct@crate::net::TcpListener + /// [`TcpSocket`]: struct@crate::net::TcpSocket + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::io::AsyncWriteExt; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// To shut down the stream in the write direction, you can call the + /// [`shutdown()`] method. This will cause the other peer to receive a read of + /// length 0, indicating that no more data will be sent. This only closes + /// the stream in one direction. + /// + /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown + pub struct TcpStream { + io: PollEvented, + } } } @@ -154,7 +155,7 @@ impl TcpStream { pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { let io = PollEvented::new(connected)?; - Ok(TcpStream { io }) + Ok(new_instrumented_resource!(Network, TcpStream { io })) } /// Creates new `TcpStream` from a `std::net::TcpStream`. @@ -189,7 +190,7 @@ impl TcpStream { pub fn from_std(stream: std::net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; - Ok(TcpStream { io }) + Ok(new_instrumented_resource!(Network, TcpStream { io })) } /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. @@ -387,7 +388,7 @@ impl TcpStream { /// // if the readiness event is a false positive. /// match stream.try_read(&mut data) { /// Ok(n) => { - /// println!("read {} bytes", n); + /// println!("read {} bytes", n); /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; @@ -1178,22 +1179,25 @@ impl TcpStream { // To read or write without mutable access to the `UnixStream`, combine the // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or // `try_write` methods. - - pub(crate) fn poll_read_priv( - &self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - // Safety: `TcpStream::read` correctly handles reads into uninitialized memory - unsafe { self.io.poll_read(cx, buf) } + instrument_resource_op! { + pub(crate) fn poll_read_priv( + &self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Safety: `TcpStream::read` correctly handles reads into uninitialized memory + unsafe { self.io.poll_read(cx, buf) } + } } - pub(super) fn poll_write_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.io.poll_write(cx, buf) + instrument_resource_op! { + pub(super) fn poll_write_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.io.poll_write(cx, buf) + } } pub(super) fn poll_write_vectored_priv( diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index 40f745ad7e8..58e43523323 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -1,7 +1,6 @@ use crate::time::driver::{Handle, TimerEntry}; use crate::time::{error::Error, Duration, Instant}; -use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; @@ -65,7 +64,8 @@ pub fn sleep(duration: Duration) -> Sleep { } } -pin_project! { +instrument_resource! { + pin_project, /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). /// /// This type does not implement the `Unpin` trait, which means that if you @@ -171,7 +171,7 @@ impl Sleep { let handle = Handle::current(); let entry = TimerEntry::new(&handle, deadline); - Sleep { deadline, entry } + new_instrumented_resource!(Timer, Sleep { deadline, entry }) } pub(crate) fn far_future() -> Sleep { @@ -222,17 +222,17 @@ impl Sleep { me.entry.reset(deadline); *me.deadline = deadline; } + instrument_resource_op! { + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let me = self.project(); + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); - - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) + me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) + } } } From 873baf8c522fb5e8b8a0a337f88833f6e9e0c2c6 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 6 Jul 2021 19:23:48 +0000 Subject: [PATCH 2/6] make private Signed-off-by: Zahari Dichev --- tokio/src/macros/instrument.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tokio/src/macros/instrument.rs b/tokio/src/macros/instrument.rs index f332fc951f4..9c06c1d06ca 100644 --- a/tokio/src/macros/instrument.rs +++ b/tokio/src/macros/instrument.rs @@ -1,6 +1,4 @@ cfg_trace! { - #[macro_export] - #[doc(hidden)] macro_rules! instrument_resource { ( pin_project, @@ -46,8 +44,6 @@ cfg_trace! { } - #[macro_export] - #[doc(hidden)] macro_rules! new_instrumented_resource { ( $resource_type:ident, @@ -68,8 +64,6 @@ cfg_trace! { }; } - #[macro_export] - #[doc(hidden)] macro_rules! instrument_resource_op { ( $( #[$attr:meta] )* @@ -101,8 +95,6 @@ cfg_trace! { } cfg_not_trace! { - #[macro_export] - #[doc(hidden)] macro_rules! instrument_resource { (pin_project, $($t:tt)*) => { pin_project_lite::pin_project! { @@ -114,16 +106,12 @@ cfg_not_trace! { } } - #[macro_export] - #[doc(hidden)] macro_rules! new_instrumented_resource { ($resource_type:ident, $($t:tt)*) => { $($t)* } } - #[macro_export] - #[doc(hidden)] macro_rules! instrument_resource_op { ($($t:tt)*) => { $($t)* From e119d741940b290016d8321a12e468eb2f3ec6b2 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 7 Jul 2021 05:13:30 +0000 Subject: [PATCH 3/6] feedback Signed-off-by: Zahari Dichev --- tokio/src/fs/file.rs | 214 +++++++++++++++++---------------- tokio/src/macros/instrument.rs | 31 +++-- 2 files changed, 127 insertions(+), 118 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index ec83b9eaeb8..f35fb8a9a36 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -488,65 +488,67 @@ impl File { } impl AsyncRead for File { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut ReadBuf<'_>, - ) -> Poll> { - let me = self.get_mut(); - let inner = me.inner.get_mut(); - - loop { - match inner.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - if !buf.is_empty() { - buf.copy_to(dst); - *buf_cell = Some(buf); - return Ready(Ok(())); - } - - buf.ensure_capacity_for(dst); - let std = me.std.clone(); - - inner.state = Busy(sys::run(move || { - let res = buf.read_from(&mut &*std); - (Operation::Read(res), buf) - })); - } - Busy(ref mut rx) => { - let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; - - match op { - Operation::Read(Ok(_)) => { + instrument_resource_op! { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut ReadBuf<'_>, + ) -> Poll> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { buf.copy_to(dst); - inner.state = Idle(Some(buf)); + *buf_cell = Some(buf); return Ready(Ok(())); } - Operation::Read(Err(e)) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - return Ready(Err(e)); - } - Operation::Write(Ok(_)) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - continue; - } - Operation::Write(Err(e)) => { - assert!(inner.last_write_err.is_none()); - inner.last_write_err = Some(e.kind()); - inner.state = Idle(Some(buf)); - } - Operation::Seek(result) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - if let Ok(pos) = result { - inner.pos = pos; + buf.ensure_capacity_for(dst); + let std = me.std.clone(); + + inner.state = Busy(sys::run(move || { + let res = buf.read_from(&mut &*std); + (Operation::Read(res), buf) + })); + } + Busy(ref mut rx) => { + let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match op { + Operation::Read(Ok(_)) => { + buf.copy_to(dst); + inner.state = Idle(Some(buf)); + return Ready(Ok(())); + } + Operation::Read(Err(e)) => { + assert!(buf.is_empty()); + + inner.state = Idle(Some(buf)); + return Ready(Err(e)); + } + Operation::Write(Ok(_)) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + continue; + } + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + inner.state = Idle(Some(buf)); + } + Operation::Seek(result) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + if let Ok(pos) = result { + inner.pos = pos; + } + continue; } - continue; } } } @@ -618,64 +620,66 @@ impl AsyncSeek for File { } impl AsyncWrite for File { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - src: &[u8], - ) -> Poll> { - let me = self.get_mut(); - let inner = me.inner.get_mut(); - - if let Some(e) = inner.last_write_err.take() { - return Ready(Err(e.into())); - } - - loop { - match inner.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - let seek = if !buf.is_empty() { - Some(SeekFrom::Current(buf.discard_read())) - } else { - None - }; + instrument_resource_op! { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { + return Ready(Err(e.into())); + } - let n = buf.copy_from(src); - let std = me.std.clone(); + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); - inner.state = Busy(sys::run(move || { - let res = if let Some(seek) = seek { - (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) } else { - buf.write_to(&mut &*std) + None }; - (Operation::Write(res), buf) - })); + let n = buf.copy_from(src); + let std = me.std.clone(); - return Ready(Ok(n)); - } - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - inner.state = Idle(Some(buf)); + inner.state = Busy(sys::run(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; - match op { - Operation::Read(_) => { - // We don't care about the result here. The fact - // that the cursor has advanced will be reflected in - // the next iteration of the loop - continue; - } - Operation::Write(res) => { - // If the previous write was successful, continue. - // Otherwise, error. - res?; - continue; - } - Operation::Seek(_) => { - // Ignore the seek - continue; + (Operation::Write(res), buf) + })); + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => { + // We don't care about the result here. The fact + // that the cursor has advanced will be reflected in + // the next iteration of the loop + continue; + } + Operation::Write(res) => { + // If the previous write was successful, continue. + // Otherwise, error. + res?; + continue; + } + Operation::Seek(_) => { + // Ignore the seek + continue; + } } } } diff --git a/tokio/src/macros/instrument.rs b/tokio/src/macros/instrument.rs index 9c06c1d06ca..d7db4feaddf 100644 --- a/tokio/src/macros/instrument.rs +++ b/tokio/src/macros/instrument.rs @@ -15,7 +15,7 @@ cfg_trace! { pin_project_lite::pin_project! { $(#[$meta])* $visibility struct $struct_name { - __resource_span: tracing::Span, + resource_span: tracing::Span, $( $(#[$field_attrs])* $field_vis $field_name : $field_type, @@ -34,7 +34,7 @@ cfg_trace! { ) => { $(#[$meta])* $vis struct $struct_name { - __resource_span: tracing::Span, + resource_span: tracing::Span, $( $(#[$field_attrs])* $field_vis $field_name : $field_type, @@ -52,7 +52,7 @@ cfg_trace! { } ) => { $struct { - __resource_span:tracing::trace_span!( + resource_span:tracing::trace_span!( "resource", concrete_type = stringify!($struct), kind = stringify!($resource_type) @@ -65,17 +65,24 @@ cfg_trace! { } macro_rules! instrument_resource_op { + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident(&mut $self: ident, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name(&mut $self, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let __resource_span_guard = $self.resource_span.enter(); + $body + } + }; ( $( #[$attr:meta] )* $vis:vis fn $name:ident(&$self: ident, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? $body:block ) => { $vis fn $name(&$self, $($arg_name : $arg_ty,)*) $(-> $ret)? { - let __resource_span = $self.__resource_span.clone(); - let __resource_span_guard = __resource_span.enter(); - let result = (|| $body)(); - drop(__resource_span_guard); - result + let __resource_span_guard = $self.resource_span.enter(); + $body } }; ( @@ -84,11 +91,9 @@ cfg_trace! { $body:block ) => { $vis fn $name($self : $self_type, $($arg_name : $arg_ty,)*) $(-> $ret)? { - let __resource_span = $self.__resource_span.clone(); - let __resource_span_guard = __resource_span.enter(); - let result = (|| $body)(); - drop(__resource_span_guard); - result + let span = $self.resource_span.clone(); + let __resource_span_guard = span.enter(); + $body } }; } From 1955a3d715eec5b54f9c7c4d45bc76cf3a2dbe07 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 7 Jul 2021 08:49:45 +0000 Subject: [PATCH 4/6] redefine noop macros in test Signed-off-by: Zahari Dichev --- tokio/tests/fs_file_mocked.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tokio/tests/fs_file_mocked.rs b/tokio/tests/fs_file_mocked.rs index 77715327d8a..45c67c3dc79 100644 --- a/tokio/tests/fs_file_mocked.rs +++ b/tokio/tests/fs_file_mocked.rs @@ -22,6 +22,24 @@ macro_rules! cfg_io_std { use futures::future; +macro_rules! instrument_resource { + ($($t:tt)*) => { + $($t)* + } +} + +macro_rules! new_instrumented_resource { + ($resource_type:ident, $($t:tt)*) => { + $($t)* + } +} + +macro_rules! instrument_resource_op { + ($($t:tt)*) => { + $($t)* + } +} + // Load source #[allow(warnings)] #[path = "../src/fs/file.rs"] From bd7a3e910d967a7901307b1ff05a806146d29ad6 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 9 Jul 2021 08:58:25 +0000 Subject: [PATCH 5/6] feedback Signed-off-by: Zahari Dichev --- tokio/src/fs/file.rs | 2 +- tokio/src/macros/instrument.rs | 15 +++++++-------- tokio/src/net/tcp/stream.rs | 4 ++-- tokio/src/time/driver/sleep.rs | 2 +- tokio/tests/fs_file_mocked.rs | 2 +- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index f35fb8a9a36..ad29476aee7 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -210,7 +210,7 @@ impl File { }); new_instrumented_resource! { - FileSystem, + "fileSystem", File { std, inner, diff --git a/tokio/src/macros/instrument.rs b/tokio/src/macros/instrument.rs index d7db4feaddf..eab75516e2b 100644 --- a/tokio/src/macros/instrument.rs +++ b/tokio/src/macros/instrument.rs @@ -46,16 +46,16 @@ cfg_trace! { macro_rules! new_instrumented_resource { ( - $resource_type:ident, + $resource_type:literal, $struct:ident { $($field:ident),* $(,)* // Handle non shorthand initialization } ) => { $struct { - resource_span:tracing::trace_span!( + resource_span: tracing::trace_span!( "resource", concrete_type = stringify!($struct), - kind = stringify!($resource_type) + kind = $resource_type ), $( $field, @@ -71,7 +71,7 @@ cfg_trace! { $body:block ) => { $vis fn $name(&mut $self, $($arg_name : $arg_ty,)*) $(-> $ret)? { - let __resource_span_guard = $self.resource_span.enter(); + let _resource_span_guard = $self.resource_span.enter(); $body } }; @@ -81,7 +81,7 @@ cfg_trace! { $body:block ) => { $vis fn $name(&$self, $($arg_name : $arg_ty,)*) $(-> $ret)? { - let __resource_span_guard = $self.resource_span.enter(); + let _resource_span_guard = $self.resource_span.enter(); $body } }; @@ -91,8 +91,7 @@ cfg_trace! { $body:block ) => { $vis fn $name($self : $self_type, $($arg_name : $arg_ty,)*) $(-> $ret)? { - let span = $self.resource_span.clone(); - let __resource_span_guard = span.enter(); + let _span = $self.resource_span.clone().entered(); $body } }; @@ -112,7 +111,7 @@ cfg_not_trace! { } macro_rules! new_instrumented_resource { - ($resource_type:ident, $($t:tt)*) => { + ($resource_type:literal, $($t:tt)*) => { $($t)* } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 654ebea8137..16dd92d01b6 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -155,7 +155,7 @@ impl TcpStream { pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { let io = PollEvented::new(connected)?; - Ok(new_instrumented_resource!(Network, TcpStream { io })) + Ok(new_instrumented_resource!("network", TcpStream { io })) } /// Creates new `TcpStream` from a `std::net::TcpStream`. @@ -190,7 +190,7 @@ impl TcpStream { pub fn from_std(stream: std::net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; - Ok(new_instrumented_resource!(Network, TcpStream { io })) + Ok(new_instrumented_resource!("network", TcpStream { io })) } /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index 58e43523323..1645a1f2ec1 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -171,7 +171,7 @@ impl Sleep { let handle = Handle::current(); let entry = TimerEntry::new(&handle, deadline); - new_instrumented_resource!(Timer, Sleep { deadline, entry }) + new_instrumented_resource!("timer", Sleep { deadline, entry }) } pub(crate) fn far_future() -> Sleep { diff --git a/tokio/tests/fs_file_mocked.rs b/tokio/tests/fs_file_mocked.rs index 45c67c3dc79..474c1628abc 100644 --- a/tokio/tests/fs_file_mocked.rs +++ b/tokio/tests/fs_file_mocked.rs @@ -29,7 +29,7 @@ macro_rules! instrument_resource { } macro_rules! new_instrumented_resource { - ($resource_type:ident, $($t:tt)*) => { + ($resource_type:literal, $($t:tt)*) => { $($t)* } } From 8c7364ab0eb750ceec55abb7742886004825e90b Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Sat, 10 Jul 2021 05:26:24 +0000 Subject: [PATCH 6/6] shorter resource kind names Signed-off-by: Zahari Dichev --- tokio/src/fs/file.rs | 2 +- tokio/src/net/tcp/stream.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index ad29476aee7..dadbdef8fca 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -210,7 +210,7 @@ impl File { }); new_instrumented_resource! { - "fileSystem", + "fs", File { std, inner, diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 16dd92d01b6..80a92fa6c51 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -155,7 +155,7 @@ impl TcpStream { pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { let io = PollEvented::new(connected)?; - Ok(new_instrumented_resource!("network", TcpStream { io })) + Ok(new_instrumented_resource!("net", TcpStream { io })) } /// Creates new `TcpStream` from a `std::net::TcpStream`. @@ -190,7 +190,7 @@ impl TcpStream { pub fn from_std(stream: std::net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; - Ok(new_instrumented_resource!("network", TcpStream { io })) + Ok(new_instrumented_resource!("net", TcpStream { io })) } /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].