Skip to content

Commit

Permalink
fs: try doing a non-blocking read before punting to the threadpool (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wmanley committed Apr 14, 2021
1 parent cab4a59 commit 39706b1
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 5 deletions.
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Expand Up @@ -42,7 +42,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { version = "0.2.87" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
191 changes: 189 additions & 2 deletions tokio/src/fs/file.rs
Expand Up @@ -491,14 +491,18 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let buf = buf_cell.as_mut().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

let mut buf = buf_cell.take().unwrap();
buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,186 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
use std::sync::atomic::{AtomicBool, Ordering};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT);
if let Err(err) = &out {
match err.raw_os_error() {
Some(libc::ENOSYS) => {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
_ => {}
}
}
Some(out)
}

#[cfg(any(not(target_os = "linux"), test))]
pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}

#[cfg(target_os = "linux")]
mod preadv2 {
use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t};
use std::os::unix::prelude::AsRawFd;

use crate::io::ReadBuf;

pub(crate) fn preadv2_safe(
file: &std::fs::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We have to defend against buffer overflows manually here. The slice API makes
* this fairly straightforward. */
let unfilled = dst.unfilled_mut();
let mut iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(
file.as_raw_fd(),
&mut iov as *mut iovec as *const iovec,
1,
offset,
flags,
);
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(bytes_read as usize);
dst.advance(bytes_read as usize);
Ok(())
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_preadv2_safe() {
use std::io::{Seek, Write};
use std::mem::MaybeUninit;
use tempfile::tempdir;

let tmp = tempdir().unwrap();
let filename = tmp.path().join("file");
const MESSAGE: &[u8] = b"Hello this is a test";
{
let mut f = std::fs::File::create(&filename).unwrap();
f.write_all(MESSAGE).unwrap();
}
let f = std::fs::File::open(&filename).unwrap();

let mut buf = [MaybeUninit::<u8>::new(0); 50];
let mut br = ReadBuf::uninit(&mut buf);

// Basic use:
preadv2_safe(&f, &mut br, 0, 0).unwrap();
assert_eq!(br.initialized().len(), MESSAGE.len());
assert_eq!(br.filled(), MESSAGE);

// Here we check that offset works, but also that appending to a non-empty buffer
// behaves correctly WRT initialisation.
preadv2_safe(&f, &mut br, 5, 0).unwrap();
assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5);
assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref());

// offset of -1 means use the current cursor. This has not been advanced by the
// previous reads because we specified an offset there.
preadv2_safe(&f, &mut br, -1, 0).unwrap();
assert_eq!(br.remaining(), 0);
assert_eq!(
br.filled(),
b"Hello this is a test this is a testHello this is a".as_ref()
);

// but the offset should have been advanced by that read
br.clear();
preadv2_safe(&f, &mut br, -1, 0).unwrap();
assert_eq!(br.filled(), b" test");

// This should be in cache, so RWF_NOWAIT should work, but it not being in cache
// (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too.
br.clear();
match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) {
Ok(()) => assert_eq!(br.filled(), MESSAGE),
Err(e) => assert!(matches!(
e.raw_os_error(),
Some(libc::ENOTSUP) | Some(libc::EAGAIN)
)),
}

// Test handling large offsets
{
// I hope the underlying filesystem supports sparse files
let mut w = std::fs::OpenOptions::new()
.write(true)
.open(&filename)
.unwrap();
w.set_len(0x1_0000_0000).unwrap();
w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap();
w.write_all(b"This is a Large File").unwrap();
}

br.clear();
preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap();
assert_eq!(br.filled(), b"a Large File");
}
}

fn pos_to_lohi(offset: off_t) -> (c_long, c_long) {
// 64-bit offset is split over high and low 32-bits on 32-bit architectures.
// 64-bit architectures still have high and low arguments, but only the low
// one is inspected. See pos_from_hilo in linux/fs/read_write.c.
const HALF_LONG_BITS: usize = core::mem::size_of::<c_long>() * 8 / 2;
(
offset as c_long,
// We want to shift this off_t value by size_of::<c_long>(). We can't do
// it in one shift because if they're both 64-bits we'd be doing u64 >> 64
// which is implementation defined. Instead do it in two halves:
((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long,
)
}

pub(crate) const RWF_NOWAIT: c_int = 0x00000008;
unsafe fn preadv2(
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
flags: c_int,
) -> ssize_t {
// Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc
// and only since v2.26. By using syscall we don't need to worry about compatiblity with
// old glibc versions and it will work on Android and musl too. The downside is that you
// can't use `LD_PRELOAD` tricks any more to intercept these calls.
let (lo, hi) = pos_to_lohi(offset);
libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t
}
}
21 changes: 21 additions & 0 deletions tokio/tests/fs_file.rs
Expand Up @@ -22,6 +22,27 @@ async fn basic_read() {

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);

// Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in
// file.rs)
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
nix::unistd::fsync(tempfile.as_raw_fd()).unwrap();
nix::fcntl::posix_fadvise(
tempfile.as_raw_fd(),
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
)
.unwrap();
}

let mut file = File::open(tempfile.path()).await.unwrap();
let n = file.read(&mut buf).await.unwrap();

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);
}

#[tokio::test]
Expand Down

0 comments on commit 39706b1

Please sign in to comment.