diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 7e82c7850db..dd2763069d5 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -42,7 +42,7 @@ full = [ "time", ] -fs = [] +fs = ["libc"] io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true } tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -libc = { version = "0.2.42", optional = true } +libc = { version = "0.2.87", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] -libc = { version = "0.2.42" } +libc = { version = "0.2.87" } nix = { version = "0.19.0" } [target.'cfg(windows)'.dependencies.winapi] diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5c06e732b09..abd6e8cf1a3 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -491,14 +491,18 @@ impl AsyncRead for File { loop { match inner.state { Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); + let buf = buf_cell.as_mut().unwrap(); if !buf.is_empty() { buf.copy_to(dst); - *buf_cell = Some(buf); return Ready(Ok(())); } + if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) { + return Ready(x); + } + + let mut buf = buf_cell.take().unwrap(); buf.ensure_capacity_for(dst); let std = me.std.clone(); @@ -756,3 +760,186 @@ impl Inner { } } } + +#[cfg(all(target_os = "linux", not(test)))] +pub(crate) fn try_nonblocking_read( + file: &crate::fs::sys::File, + dst: &mut ReadBuf<'_>, +) -> Option> { + use std::sync::atomic::{AtomicBool, Ordering}; + + static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true); + if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) { + return None; + } + let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT); + if let Err(err) = &out { + match err.raw_os_error() { + Some(libc::ENOSYS) => { + NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed); + return None; + } + Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None, + _ => {} + } + } + Some(out) +} + +#[cfg(any(not(target_os = "linux"), test))] +pub(crate) fn try_nonblocking_read( + _file: &crate::fs::sys::File, + _dst: &mut ReadBuf<'_>, +) -> Option> { + None +} + +#[cfg(target_os = "linux")] +mod preadv2 { + use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t}; + use std::os::unix::prelude::AsRawFd; + + use crate::io::ReadBuf; + + pub(crate) fn preadv2_safe( + file: &std::fs::File, + dst: &mut ReadBuf<'_>, + offset: off_t, + flags: c_int, + ) -> std::io::Result<()> { + unsafe { + /* We have to defend against buffer overflows manually here. The slice API makes + * this fairly straightforward. */ + let unfilled = dst.unfilled_mut(); + let mut iov = iovec { + iov_base: unfilled.as_mut_ptr() as *mut c_void, + iov_len: unfilled.len(), + }; + /* We take a File object rather than an fd as reading from a sensitive fd may confuse + * other unsafe code that assumes that only they have access to that fd. */ + let bytes_read = preadv2( + file.as_raw_fd(), + &mut iov as *mut iovec as *const iovec, + 1, + offset, + flags, + ); + if bytes_read < 0 { + Err(std::io::Error::last_os_error()) + } else { + /* preadv2 returns the number of bytes read, e.g. the number of bytes that have + * written into `unfilled`. So it's safe to assume that the data is now + * initialised */ + dst.assume_init(bytes_read as usize); + dst.advance(bytes_read as usize); + Ok(()) + } + } + } + + #[cfg(test)] + mod test { + use super::*; + + #[test] + fn test_preadv2_safe() { + use std::io::{Seek, Write}; + use std::mem::MaybeUninit; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("file"); + const MESSAGE: &[u8] = b"Hello this is a test"; + { + let mut f = std::fs::File::create(&filename).unwrap(); + f.write_all(MESSAGE).unwrap(); + } + let f = std::fs::File::open(&filename).unwrap(); + + let mut buf = [MaybeUninit::::new(0); 50]; + let mut br = ReadBuf::uninit(&mut buf); + + // Basic use: + preadv2_safe(&f, &mut br, 0, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len()); + assert_eq!(br.filled(), MESSAGE); + + // Here we check that offset works, but also that appending to a non-empty buffer + // behaves correctly WRT initialisation. + preadv2_safe(&f, &mut br, 5, 0).unwrap(); + assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5); + assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref()); + + // offset of -1 means use the current cursor. This has not been advanced by the + // previous reads because we specified an offset there. + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.remaining(), 0); + assert_eq!( + br.filled(), + b"Hello this is a test this is a testHello this is a".as_ref() + ); + + // but the offset should have been advanced by that read + br.clear(); + preadv2_safe(&f, &mut br, -1, 0).unwrap(); + assert_eq!(br.filled(), b" test"); + + // This should be in cache, so RWF_NOWAIT should work, but it not being in cache + // (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too. + br.clear(); + match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) { + Ok(()) => assert_eq!(br.filled(), MESSAGE), + Err(e) => assert!(matches!( + e.raw_os_error(), + Some(libc::ENOTSUP) | Some(libc::EAGAIN) + )), + } + + // Test handling large offsets + { + // I hope the underlying filesystem supports sparse files + let mut w = std::fs::OpenOptions::new() + .write(true) + .open(&filename) + .unwrap(); + w.set_len(0x1_0000_0000).unwrap(); + w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap(); + w.write_all(b"This is a Large File").unwrap(); + } + + br.clear(); + preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap(); + assert_eq!(br.filled(), b"a Large File"); + } + } + + fn pos_to_lohi(offset: off_t) -> (c_long, c_long) { + // 64-bit offset is split over high and low 32-bits on 32-bit architectures. + // 64-bit architectures still have high and low arguments, but only the low + // one is inspected. See pos_from_hilo in linux/fs/read_write.c. + const HALF_LONG_BITS: usize = core::mem::size_of::() * 8 / 2; + ( + offset as c_long, + // We want to shift this off_t value by size_of::(). We can't do + // it in one shift because if they're both 64-bits we'd be doing u64 >> 64 + // which is implementation defined. Instead do it in two halves: + ((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long, + ) + } + + pub(crate) const RWF_NOWAIT: c_int = 0x00000008; + unsafe fn preadv2( + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + flags: c_int, + ) -> ssize_t { + // Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc + // and only since v2.26. By using syscall we don't need to worry about compatiblity with + // old glibc versions and it will work on Android and musl too. The downside is that you + // can't use `LD_PRELOAD` tricks any more to intercept these calls. + let (lo, hi) = pos_to_lohi(offset); + libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t + } +} diff --git a/tokio/tests/fs_file.rs b/tokio/tests/fs_file.rs index bf2f1d7b5d3..1f4760e72dd 100644 --- a/tokio/tests/fs_file.rs +++ b/tokio/tests/fs_file.rs @@ -22,6 +22,27 @@ async fn basic_read() { assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); + + // Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in + // file.rs) + #[cfg(target_os = "linux")] + { + use std::os::unix::io::AsRawFd; + nix::unistd::fsync(tempfile.as_raw_fd()).unwrap(); + nix::fcntl::posix_fadvise( + tempfile.as_raw_fd(), + 0, + 0, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED, + ) + .unwrap(); + } + + let mut file = File::open(tempfile.path()).await.unwrap(); + let n = file.read(&mut buf).await.unwrap(); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); } #[tokio::test]