Skip to content

Commit

Permalink
File: Try doing a non-blocking read before punting to the threadpool
Browse files Browse the repository at this point in the history
...on Linux.

If the data is already available in cache this will avoid cross-thread
interaction and remove a copy.  It should help with latency too as reads
that can be satisfied now won't need to wait in queue until other fs
operations are complete.
  • Loading branch information
wmanley committed Feb 12, 2021
1 parent e827829 commit 227421a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Expand Up @@ -43,7 +43,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -104,11 +104,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { git = "https://github.com/rust-lang/libc", rev = "a61fd8c79c21b387cdf6e4bcabc8b04afe594960", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { git = "https://github.com/rust-lang/libc", rev = "a61fd8c79c21b387cdf6e4bcabc8b04afe594960" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
75 changes: 75 additions & 0 deletions tokio/src/fs/file.rs
Expand Up @@ -499,6 +499,10 @@ impl AsyncRead for File {
return Ready(Ok(()));
}

if let Some(x) = read_nowait::try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,74 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
mod read_nowait {
use crate::io::ReadBuf;
use libc::{c_int, c_void, iovec, off_t, preadv2};
use std::{
os::unix::prelude::AsRawFd,
sync::atomic::{AtomicBool, Ordering},
};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);

pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2_safe(file, dst, -1, libc::RWF_NOWAIT);
if let Err(err) = &out {
if matches!(err.raw_os_error(), Some(libc::ENOSYS)) {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
}
Some(out)
}

fn preadv2_safe(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We're manually have to defend against buffer overflows here. The slice API makes
* this fairly streightforward. */
let unfilled = dst.unfilled_mut();
let iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(file.as_raw_fd(), &iov as *const iovec, 1, offset, flags);
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(dst.filled().len() + bytes_read as usize);
dst.advance(bytes_read as usize);
Ok(())
}
}
}
}

#[cfg(any(not(target_os = "linux"), test))]
mod read_nowait {
use crate::io::ReadBuf;

pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}
}

0 comments on commit 227421a

Please sign in to comment.