diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5c06e732b09..dadbdef8fca 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -19,67 +19,69 @@ use std::task::Context; use std::task::Poll; use std::task::Poll::*; -/// A reference to an open file on the filesystem. -/// -/// This is a specialized version of [`std::fs::File`][std] for usage from the -/// Tokio runtime. -/// -/// An instance of a `File` can be read and/or written depending on what options -/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical -/// cursor that the file contains internally. -/// -/// A file will not be closed immediately when it goes out of scope if there -/// are any IO operations that have not yet completed. To ensure that a file is -/// closed immediately when it is dropped, you should call [`flush`] before -/// dropping it. Note that this does not ensure that the file has been fully -/// written to disk; the operating system might keep the changes around in an -/// in-memory buffer. See the [`sync_all`] method for telling the OS to write -/// the data to disk. -/// -/// Reading and writing to a `File` is usually done using the convenience -/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. -/// -/// [std]: struct@std::fs::File -/// [`AsyncSeek`]: trait@crate::io::AsyncSeek -/// [`flush`]: fn@crate::io::AsyncWriteExt::flush -/// [`sync_all`]: fn@crate::fs::File::sync_all -/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt -/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt -/// -/// # Examples -/// -/// Create a new file and asynchronously write bytes to it: -/// -/// ```no_run -/// use tokio::fs::File; -/// use tokio::io::AsyncWriteExt; // for write_all() -/// -/// # async fn dox() -> std::io::Result<()> { -/// let mut file = File::create("foo.txt").await?; -/// file.write_all(b"hello, world!").await?; -/// # Ok(()) -/// # } -/// ``` -/// -/// Read the contents of a file into a buffer -/// -/// ```no_run -/// use tokio::fs::File; -/// use tokio::io::AsyncReadExt; // for read_to_end() -/// -/// # async fn dox() -> std::io::Result<()> { -/// let mut file = File::open("foo.txt").await?; -/// -/// let mut contents = vec![]; -/// file.read_to_end(&mut contents).await?; -/// -/// println!("len = {}", contents.len()); -/// # Ok(()) -/// # } -/// ``` -pub struct File { - std: Arc, - inner: Mutex, +instrument_resource! { + /// A reference to an open file on the filesystem. + /// + /// This is a specialized version of [`std::fs::File`][std] for usage from the + /// Tokio runtime. + /// + /// An instance of a `File` can be read and/or written depending on what options + /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical + /// cursor that the file contains internally. + /// + /// A file will not be closed immediately when it goes out of scope if there + /// are any IO operations that have not yet completed. To ensure that a file is + /// closed immediately when it is dropped, you should call [`flush`] before + /// dropping it. Note that this does not ensure that the file has been fully + /// written to disk; the operating system might keep the changes around in an + /// in-memory buffer. See the [`sync_all`] method for telling the OS to write + /// the data to disk. + /// + /// Reading and writing to a `File` is usually done using the convenience + /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. + /// + /// [std]: struct@std::fs::File + /// [`AsyncSeek`]: trait@crate::io::AsyncSeek + /// [`flush`]: fn@crate::io::AsyncWriteExt::flush + /// [`sync_all`]: fn@crate::fs::File::sync_all + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// # Examples + /// + /// Create a new file and asynchronously write bytes to it: + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncWriteExt; // for write_all() + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// Read the contents of a file into a buffer + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncReadExt; // for read_to_end() + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// + /// let mut contents = vec![]; + /// file.read_to_end(&mut contents).await?; + /// + /// println!("len = {}", contents.len()); + /// # Ok(()) + /// # } + /// ``` + pub struct File { + std: Arc, + inner: Mutex, + } } struct Inner { @@ -200,13 +202,19 @@ impl File { /// let file = tokio::fs::File::from_std(std_file); /// ``` pub fn from_std(std: sys::File) -> File { - File { - std: Arc::new(std), - inner: Mutex::new(Inner { - state: State::Idle(Some(Buf::with_capacity(0))), - last_write_err: None, - pos: 0, - }), + let std = Arc::new(std); + let inner = Mutex::new(Inner { + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + pos: 0, + }); + + new_instrumented_resource! { + "fs", + File { + std, + inner, + } } } @@ -480,65 +488,67 @@ impl File { } impl AsyncRead for File { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut ReadBuf<'_>, - ) -> Poll> { - let me = self.get_mut(); - let inner = me.inner.get_mut(); - - loop { - match inner.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - if !buf.is_empty() { - buf.copy_to(dst); - *buf_cell = Some(buf); - return Ready(Ok(())); - } - - buf.ensure_capacity_for(dst); - let std = me.std.clone(); - - inner.state = Busy(sys::run(move || { - let res = buf.read_from(&mut &*std); - (Operation::Read(res), buf) - })); - } - Busy(ref mut rx) => { - let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; - - match op { - Operation::Read(Ok(_)) => { + instrument_resource_op! { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut ReadBuf<'_>, + ) -> Poll> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { buf.copy_to(dst); - inner.state = Idle(Some(buf)); + *buf_cell = Some(buf); return Ready(Ok(())); } - Operation::Read(Err(e)) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - return Ready(Err(e)); - } - Operation::Write(Ok(_)) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - continue; - } - Operation::Write(Err(e)) => { - assert!(inner.last_write_err.is_none()); - inner.last_write_err = Some(e.kind()); - inner.state = Idle(Some(buf)); - } - Operation::Seek(result) => { - assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - if let Ok(pos) = result { - inner.pos = pos; + buf.ensure_capacity_for(dst); + let std = me.std.clone(); + + inner.state = Busy(sys::run(move || { + let res = buf.read_from(&mut &*std); + (Operation::Read(res), buf) + })); + } + Busy(ref mut rx) => { + let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; + + match op { + Operation::Read(Ok(_)) => { + buf.copy_to(dst); + inner.state = Idle(Some(buf)); + return Ready(Ok(())); + } + Operation::Read(Err(e)) => { + assert!(buf.is_empty()); + + inner.state = Idle(Some(buf)); + return Ready(Err(e)); + } + Operation::Write(Ok(_)) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + continue; + } + Operation::Write(Err(e)) => { + assert!(inner.last_write_err.is_none()); + inner.last_write_err = Some(e.kind()); + inner.state = Idle(Some(buf)); + } + Operation::Seek(result) => { + assert!(buf.is_empty()); + inner.state = Idle(Some(buf)); + if let Ok(pos) = result { + inner.pos = pos; + } + continue; } - continue; } } } @@ -610,64 +620,66 @@ impl AsyncSeek for File { } impl AsyncWrite for File { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - src: &[u8], - ) -> Poll> { - let me = self.get_mut(); - let inner = me.inner.get_mut(); - - if let Some(e) = inner.last_write_err.take() { - return Ready(Err(e.into())); - } - - loop { - match inner.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - let seek = if !buf.is_empty() { - Some(SeekFrom::Current(buf.discard_read())) - } else { - None - }; + instrument_resource_op! { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll> { + let me = self.get_mut(); + let inner = me.inner.get_mut(); + + if let Some(e) = inner.last_write_err.take() { + return Ready(Err(e.into())); + } - let n = buf.copy_from(src); - let std = me.std.clone(); + loop { + match inner.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); - inner.state = Busy(sys::run(move || { - let res = if let Some(seek) = seek { - (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) } else { - buf.write_to(&mut &*std) + None }; - (Operation::Write(res), buf) - })); + let n = buf.copy_from(src); + let std = me.std.clone(); - return Ready(Ok(n)); - } - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - inner.state = Idle(Some(buf)); + inner.state = Busy(sys::run(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; - match op { - Operation::Read(_) => { - // We don't care about the result here. The fact - // that the cursor has advanced will be reflected in - // the next iteration of the loop - continue; - } - Operation::Write(res) => { - // If the previous write was successful, continue. - // Otherwise, error. - res?; - continue; - } - Operation::Seek(_) => { - // Ignore the seek - continue; + (Operation::Write(res), buf) + })); + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + inner.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => { + // We don't care about the result here. The fact + // that the cursor has advanced will be reflected in + // the next iteration of the loop + continue; + } + Operation::Write(res) => { + // If the previous write was successful, continue. + // Otherwise, error. + res?; + continue; + } + Operation::Seek(_) => { + // Ignore the seek + continue; + } } } } diff --git a/tokio/src/macros/instrument.rs b/tokio/src/macros/instrument.rs new file mode 100644 index 00000000000..eab75516e2b --- /dev/null +++ b/tokio/src/macros/instrument.rs @@ -0,0 +1,124 @@ +cfg_trace! { + macro_rules! instrument_resource { + ( + pin_project, + $(#[$meta:meta])* + // pin project gets confused when this is a `vis` + // and does not infer the projection visibility correctly + $visibility:ident struct $struct_name:ident { + $( + $(#[$field_attrs:ident])* + $field_vis:vis $field_name:ident : $field_type:ty + ),*$(,)+ + } + ) => { + pin_project_lite::pin_project! { + $(#[$meta])* + $visibility struct $struct_name { + resource_span: tracing::Span, + $( + $(#[$field_attrs])* + $field_vis $field_name : $field_type, + )* + } + } + }; + ( + $(#[$meta:meta])* + $vis:vis struct $struct_name:ident { + $( + $(#[$field_attrs:ident])* + $field_vis:vis $field_name:ident : $field_type:ty + ),*$(,)+ + } + ) => { + $(#[$meta])* + $vis struct $struct_name { + resource_span: tracing::Span, + $( + $(#[$field_attrs])* + $field_vis $field_name : $field_type, + )* + } + } + } + + + macro_rules! new_instrumented_resource { + ( + $resource_type:literal, + $struct:ident { + $($field:ident),* $(,)* // Handle non shorthand initialization + } + ) => { + $struct { + resource_span: tracing::trace_span!( + "resource", + concrete_type = stringify!($struct), + kind = $resource_type + ), + $( + $field, + )* + } + }; + } + + macro_rules! instrument_resource_op { + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident(&mut $self: ident, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name(&mut $self, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let _resource_span_guard = $self.resource_span.enter(); + $body + } + }; + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident(&$self: ident, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name(&$self, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let _resource_span_guard = $self.resource_span.enter(); + $body + } + }; + ( + $( #[$attr:meta] )* + $vis:vis fn $name:ident($self:tt : $self_type:ty, $($arg_name:ident : $arg_ty:ty),* $(,)*) $(-> $ret:ty)? + $body:block + ) => { + $vis fn $name($self : $self_type, $($arg_name : $arg_ty,)*) $(-> $ret)? { + let _span = $self.resource_span.clone().entered(); + $body + } + }; + } +} + +cfg_not_trace! { + macro_rules! instrument_resource { + (pin_project, $($t:tt)*) => { + pin_project_lite::pin_project! { + $($t)* + } + }; + ($($t:tt)*) => { + $($t)* + } + } + + macro_rules! new_instrumented_resource { + ($resource_type:literal, $($t:tt)*) => { + $($t)* + } + } + + macro_rules! instrument_resource_op { + ($($t:tt)*) => { + $($t)* + } + } +} diff --git a/tokio/src/macros/mod.rs b/tokio/src/macros/mod.rs index b0af5215256..3b4403aaf55 100644 --- a/tokio/src/macros/mod.rs +++ b/tokio/src/macros/mod.rs @@ -15,6 +15,9 @@ mod ready; #[macro_use] mod thread_local; +#[macro_use] +mod instrument; + #[macro_use] #[cfg(feature = "rt")] pub(crate) mod scoped_tls; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0277a360d09..80a92fa6c51 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -15,57 +15,58 @@ use std::time::Duration; cfg_io_util! { use bytes::BufMut; } - cfg_net! { - /// A TCP stream between a local and a remote socket. - /// - /// A TCP stream can either be created by connecting to an endpoint, via the - /// [`connect`] method, or by [accepting] a connection from a [listener]. A - /// TCP stream can also be created via the [`TcpSocket`] type. - /// - /// Reading and writing to a `TcpStream` is usually done using the - /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] - /// traits. - /// - /// [`connect`]: method@TcpStream::connect - /// [accepting]: method@crate::net::TcpListener::accept - /// [listener]: struct@crate::net::TcpListener - /// [`TcpSocket`]: struct@crate::net::TcpSocket - /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt - /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// use tokio::io::AsyncWriteExt; - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// // Connect to a peer - /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// // Write some data. - /// stream.write_all(b"hello world!").await?; - /// - /// Ok(()) - /// } - /// ``` - /// - /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. - /// - /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all - /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt - /// - /// To shut down the stream in the write direction, you can call the - /// [`shutdown()`] method. This will cause the other peer to receive a read of - /// length 0, indicating that no more data will be sent. This only closes - /// the stream in one direction. - /// - /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown - pub struct TcpStream { - io: PollEvented, + instrument_resource! { + /// A TCP stream between a local and a remote socket. + /// + /// A TCP stream can either be created by connecting to an endpoint, via the + /// [`connect`] method, or by [accepting] a connection from a [listener]. A + /// TCP stream can also be created via the [`TcpSocket`] type. + /// + /// Reading and writing to a `TcpStream` is usually done using the + /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] + /// traits. + /// + /// [`connect`]: method@TcpStream::connect + /// [accepting]: method@crate::net::TcpListener::accept + /// [listener]: struct@crate::net::TcpListener + /// [`TcpSocket`]: struct@crate::net::TcpSocket + /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::io::AsyncWriteExt; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + /// + /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. + /// + /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all + /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt + /// + /// To shut down the stream in the write direction, you can call the + /// [`shutdown()`] method. This will cause the other peer to receive a read of + /// length 0, indicating that no more data will be sent. This only closes + /// the stream in one direction. + /// + /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown + pub struct TcpStream { + io: PollEvented, + } } } @@ -154,7 +155,7 @@ impl TcpStream { pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { let io = PollEvented::new(connected)?; - Ok(TcpStream { io }) + Ok(new_instrumented_resource!("net", TcpStream { io })) } /// Creates new `TcpStream` from a `std::net::TcpStream`. @@ -189,7 +190,7 @@ impl TcpStream { pub fn from_std(stream: std::net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; - Ok(TcpStream { io }) + Ok(new_instrumented_resource!("net", TcpStream { io })) } /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. @@ -387,7 +388,7 @@ impl TcpStream { /// // if the readiness event is a false positive. /// match stream.try_read(&mut data) { /// Ok(n) => { - /// println!("read {} bytes", n); + /// println!("read {} bytes", n); /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; @@ -1178,22 +1179,25 @@ impl TcpStream { // To read or write without mutable access to the `UnixStream`, combine the // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or // `try_write` methods. - - pub(crate) fn poll_read_priv( - &self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - // Safety: `TcpStream::read` correctly handles reads into uninitialized memory - unsafe { self.io.poll_read(cx, buf) } + instrument_resource_op! { + pub(crate) fn poll_read_priv( + &self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Safety: `TcpStream::read` correctly handles reads into uninitialized memory + unsafe { self.io.poll_read(cx, buf) } + } } - pub(super) fn poll_write_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.io.poll_write(cx, buf) + instrument_resource_op! { + pub(super) fn poll_write_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.io.poll_write(cx, buf) + } } pub(super) fn poll_write_vectored_priv( diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index 40f745ad7e8..1645a1f2ec1 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -1,7 +1,6 @@ use crate::time::driver::{Handle, TimerEntry}; use crate::time::{error::Error, Duration, Instant}; -use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; @@ -65,7 +64,8 @@ pub fn sleep(duration: Duration) -> Sleep { } } -pin_project! { +instrument_resource! { + pin_project, /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until). /// /// This type does not implement the `Unpin` trait, which means that if you @@ -171,7 +171,7 @@ impl Sleep { let handle = Handle::current(); let entry = TimerEntry::new(&handle, deadline); - Sleep { deadline, entry } + new_instrumented_resource!("timer", Sleep { deadline, entry }) } pub(crate) fn far_future() -> Sleep { @@ -222,17 +222,17 @@ impl Sleep { me.entry.reset(deadline); *me.deadline = deadline; } + instrument_resource_op! { + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let me = self.project(); + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); - - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) + me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) + } } } diff --git a/tokio/tests/fs_file_mocked.rs b/tokio/tests/fs_file_mocked.rs index 77715327d8a..474c1628abc 100644 --- a/tokio/tests/fs_file_mocked.rs +++ b/tokio/tests/fs_file_mocked.rs @@ -22,6 +22,24 @@ macro_rules! cfg_io_std { use futures::future; +macro_rules! instrument_resource { + ($($t:tt)*) => { + $($t)* + } +} + +macro_rules! new_instrumented_resource { + ($resource_type:literal, $($t:tt)*) => { + $($t)* + } +} + +macro_rules! instrument_resource_op { + ($($t:tt)*) => { + $($t)* + } +} + // Load source #[allow(warnings)] #[path = "../src/fs/file.rs"]