Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: add resource instrumentation using declarative macros #3933

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
358 changes: 185 additions & 173 deletions tokio/src/fs/file.rs
Expand Up @@ -19,67 +19,69 @@ use std::task::Context;
use std::task::Poll;
use std::task::Poll::*;

/// A reference to an open file on the filesystem.
///
/// This is a specialized version of [`std::fs::File`][std] for usage from the
/// Tokio runtime.
///
/// An instance of a `File` can be read and/or written depending on what options
/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
/// cursor that the file contains internally.
///
/// A file will not be closed immediately when it goes out of scope if there
/// are any IO operations that have not yet completed. To ensure that a file is
/// closed immediately when it is dropped, you should call [`flush`] before
/// dropping it. Note that this does not ensure that the file has been fully
/// written to disk; the operating system might keep the changes around in an
/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
/// the data to disk.
///
/// Reading and writing to a `File` is usually done using the convenience
/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
///
/// [std]: struct@std::fs::File
/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
/// [`sync_all`]: fn@crate::fs::File::sync_all
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
///
/// # Examples
///
/// Create a new file and asynchronously write bytes to it:
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::AsyncWriteExt; // for write_all()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// # Ok(())
/// # }
/// ```
///
/// Read the contents of a file into a buffer
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::AsyncReadExt; // for read_to_end()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
///
/// let mut contents = vec![];
/// file.read_to_end(&mut contents).await?;
///
/// println!("len = {}", contents.len());
/// # Ok(())
/// # }
/// ```
pub struct File {
std: Arc<sys::File>,
inner: Mutex<Inner>,
instrument_resource! {
/// A reference to an open file on the filesystem.
///
/// This is a specialized version of [`std::fs::File`][std] for usage from the
/// Tokio runtime.
///
/// An instance of a `File` can be read and/or written depending on what options
/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
/// cursor that the file contains internally.
///
/// A file will not be closed immediately when it goes out of scope if there
/// are any IO operations that have not yet completed. To ensure that a file is
/// closed immediately when it is dropped, you should call [`flush`] before
/// dropping it. Note that this does not ensure that the file has been fully
/// written to disk; the operating system might keep the changes around in an
/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
/// the data to disk.
///
/// Reading and writing to a `File` is usually done using the convenience
/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
///
/// [std]: struct@std::fs::File
/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
/// [`sync_all`]: fn@crate::fs::File::sync_all
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
///
/// # Examples
///
/// Create a new file and asynchronously write bytes to it:
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::AsyncWriteExt; // for write_all()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// # Ok(())
/// # }
/// ```
///
/// Read the contents of a file into a buffer
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::AsyncReadExt; // for read_to_end()
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
///
/// let mut contents = vec![];
/// file.read_to_end(&mut contents).await?;
///
/// println!("len = {}", contents.len());
/// # Ok(())
/// # }
/// ```
pub struct File {
std: Arc<sys::File>,
inner: Mutex<Inner>,
}
}

struct Inner {
Expand Down Expand Up @@ -200,13 +202,19 @@ impl File {
/// let file = tokio::fs::File::from_std(std_file);
/// ```
pub fn from_std(std: sys::File) -> File {
File {
std: Arc::new(std),
inner: Mutex::new(Inner {
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
pos: 0,
}),
let std = Arc::new(std);
let inner = Mutex::new(Inner {
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
pos: 0,
});

new_instrumented_resource! {
FileSystem,
File {
std,
inner,
}
}
}

Expand Down Expand Up @@ -480,65 +488,67 @@ impl File {
}

impl AsyncRead for File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
let inner = me.inner.get_mut();

loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

inner.state = Busy(sys::run(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
}
Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
instrument_resource_op! {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
let inner = me.inner.get_mut();

loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
inner.state = Idle(Some(buf));
*buf_cell = Some(buf);
return Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());

inner.state = Idle(Some(buf));
return Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = Idle(Some(buf));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
if let Ok(pos) = result {
inner.pos = pos;
buf.ensure_capacity_for(dst);
let std = me.std.clone();

inner.state = Busy(sys::run(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
}
Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;

match op {
Operation::Read(Ok(_)) => {
buf.copy_to(dst);
inner.state = Idle(Some(buf));
return Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());

inner.state = Idle(Some(buf));
return Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = Idle(Some(buf));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
if let Ok(pos) = result {
inner.pos = pos;
}
continue;
}
continue;
}
}
}
Expand Down Expand Up @@ -610,64 +620,66 @@ impl AsyncSeek for File {
}

impl AsyncWrite for File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
) -> Poll<io::Result<usize>> {
let me = self.get_mut();
let inner = me.inner.get_mut();

if let Some(e) = inner.last_write_err.take() {
return Ready(Err(e.into()));
}

loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
} else {
None
};
instrument_resource_op! {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
) -> Poll<io::Result<usize>> {
let me = self.get_mut();
let inner = me.inner.get_mut();

if let Some(e) = inner.last_write_err.take() {
return Ready(Err(e.into()));
}

let n = buf.copy_from(src);
let std = me.std.clone();
loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

inner.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
let seek = if !buf.is_empty() {
Some(SeekFrom::Current(buf.discard_read()))
} else {
buf.write_to(&mut &*std)
None
};

(Operation::Write(res), buf)
}));
let n = buf.copy_from(src);
let std = me.std.clone();

return Ready(Ok(n));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = Idle(Some(buf));
inner.state = Busy(sys::run(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

match op {
Operation::Read(_) => {
// We don't care about the result here. The fact
// that the cursor has advanced will be reflected in
// the next iteration of the loop
continue;
}
Operation::Write(res) => {
// If the previous write was successful, continue.
// Otherwise, error.
res?;
continue;
}
Operation::Seek(_) => {
// Ignore the seek
continue;
(Operation::Write(res), buf)
}));

return Ready(Ok(n));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = Idle(Some(buf));

match op {
Operation::Read(_) => {
// We don't care about the result here. The fact
// that the cursor has advanced will be reflected in
// the next iteration of the loop
continue;
}
Operation::Write(res) => {
// If the previous write was successful, continue.
// Otherwise, error.
res?;
continue;
}
Operation::Seek(_) => {
// Ignore the seek
continue;
}
}
}
}
Expand Down