From 0c07a9e4690fc4b2d8ceb90ac463c79e50d70947 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Sat, 30 Apr 2022 14:10:28 -0600 Subject: [PATCH] Rewrite the aio module The existing AIO implementation has some problems: 1) The in_progress field is checked at runtime, not compile time. 2) The mutable field is checked at runtime, not compile time. 3) A downstream lio_listio user must store extra state to track whether the whole operation is partially, completely, or not at all submitted. 4) Nix does heap allocation itself, rather than allowing the caller to choose it. This can result in double (or triple, or quadruple) boxing. 5) There's no easy way to use lio_listio to submit multiple operations with a single syscall, but poll each individually. 6) The lio_listio usage is far from transparent and zero-cost. 7) No aio_readv or aio_writev support. 8) priority has type c_int; should be i32 9) aio_return should return a usize instead of an isize, since it only uses negative values to indicate errors, which Rust represents via the Result type. This rewrite solves several problems: 1) Unsolved. I don't think it can be solved without something like C++'s guaranteed type elision. It might require changing the signature of Future::poll too. 2) Solved. 3) Solved, by the new in_progress method and by removing the complicated lio_listio resubmit code. 4) Solved. 5) Solved. 6) Solved, by removing the lio_listo resubmit code. It can be reimplemented downstream if necessary. Or even in Nix, but it doesn't fit Nix's theme of zero-cost abstractions. 7) Solved. 8) Solved. 9) Solved. The rewrite includes functions that don't work on FreeBSD, so add CI testing for FreeBSD 14 too. By default only enable tests that will pass on FreeBSD 12.3. But run a CI job on FreeBSD 14 and set a flag that will enable such tests. --- .cirrus.yml | 13 +- CHANGELOG.md | 16 + Cargo.toml | 9 +- bors.toml | 3 +- src/fcntl.rs | 8 +- src/sys/aio.rs | 1841 ++++++++++++++------------ test/sys/test_aio.rs | 1086 +++++++-------- test/sys/test_aio_drop.rs | 13 +- test/sys/test_lio_listio_resubmit.rs | 106 -- 9 files changed, 1570 insertions(+), 1525 deletions(-) delete mode 100644 test/sys/test_lio_listio_resubmit.rs diff --git a/.cirrus.yml b/.cirrus.yml index 1045f4f5b7..231cc38fb3 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -36,11 +36,18 @@ test: &TEST # 64-bit kernel and in a 64-bit environment. Our tests don't execute any of # the system's binaries, so the environment shouldn't matter. task: - name: FreeBSD amd64 & i686 env: TARGET: x86_64-unknown-freebsd - freebsd_instance: - image: freebsd-12-3-release-amd64 + matrix: + - name: FreeBSD 12 amd64 & i686 + freebsd_instance: + image: freebsd-12-3-release-amd64 + - name: FreeBSD 14 amd64 & i686 + freebsd_instance: + image_family: freebsd-14-0-snap + # Enable tests that would fail on FreeBSD 12 + RUSTFLAGS: --cfg fbsd14 -D warnings + RUSTDOCFLAGS: --cfg fbsd14 setup_script: - kldload mqueuefs - fetch https://sh.rustup.rs -o rustup.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e9baf9005..de4d22e2c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,14 +6,30 @@ This project adheres to [Semantic Versioning](https://semver.org/). ## [Unreleased] - ReleaseDate ### Added +- Added `aio_writev` and `aio_readv`. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + - impl From for std::net::SocketAddrV4 and impl From for std::net::SocketAddrV6. (#[1711](https://github.com/nix-rust/nix/pull/1711)) ### Changed + +- Rewrote the aio module. The new module: + * Does more type checking at compile time rather than runtime. + * Gives the caller control over whether and when to `Box` an aio operation. + * Changes the type of the `priority` arguments to `i32`. + * Changes the return type of `aio_return` to `usize`. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + ### Fixed ### Removed +- Removed support for resubmitting partially complete `lio_listio` operations. + It was too complicated, and didn't fit Nix's theme of zero-cost abstractions. + Instead, it can be reimplemented downstream. + (#[1713](https://github.com/nix-rust/nix/pull/1713)) + ## [0.24.1] - 2022-04-22 ### Added ### Changed diff --git a/Cargo.toml b/Cargo.toml index 628759ea31..ebd9182a53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,10 @@ targets = [ ] [dependencies] -libc = { version = "0.2.124", features = [ "extra_traits" ] } +libc = { git = "http://github.com/rust-lang/libc.git", rev = "cd99f681181c310abfba742aef11115d2eff03dc", features = [ "extra_traits" ] } bitflags = "1.1" cfg-if = "1.0" +pin-utils = { version = "0.1.0", optional = true } [target.'cfg(not(target_os = "redox"))'.dependencies] memoffset = { version = "0.6.3", optional = true } @@ -44,7 +45,7 @@ default = [ ] acct = [] -aio = [] +aio = ["pin-utils"] dir = ["fs"] env = [] event = [] @@ -102,10 +103,6 @@ path = "test/sys/test_aio_drop.rs" name = "test-clearenv" path = "test/test_clearenv.rs" -[[test]] -name = "test-lio-listio-resubmit" -path = "test/sys/test_lio_listio_resubmit.rs" - [[test]] name = "test-mount" path = "test/test_mount.rs" diff --git a/bors.toml b/bors.toml index b22877a767..b020ca389e 100644 --- a/bors.toml +++ b/bors.toml @@ -5,7 +5,8 @@ status = [ "Android i686", "Android x86_64", "DragonFly BSD x86_64", - "FreeBSD amd64 & i686", + "FreeBSD 12 amd64 & i686", + "FreeBSD 14 amd64 & i686", "Fuchsia x86_64", "Linux MIPS", "Linux MIPS64 el", diff --git a/src/fcntl.rs b/src/fcntl.rs index fa64c8eaed..5272c80955 100644 --- a/src/fcntl.rs +++ b/src/fcntl.rs @@ -742,8 +742,8 @@ impl SpacectlRange { /// /// # Example /// -// no_run because it fails to link until FreeBSD 14.0 -/// ```no_run +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] /// # use std::io::Write; /// # use std::os::unix::fs::FileExt; /// # use std::os::unix::io::AsRawFd; @@ -788,8 +788,8 @@ pub fn fspacectl(fd: RawFd, range: SpacectlRange) -> Result { /// /// # Example /// -// no_run because it fails to link until FreeBSD 14.0 -/// ```no_run +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] /// # use std::io::Write; /// # use std::os::unix::fs::FileExt; /// # use std::os::unix::io::AsRawFd; diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 4780cdee33..6ff88469b9 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -2,9 +2,12 @@ //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like -//! devices. It supports [`read`](struct.AioCb.html#method.read), -//! [`write`](struct.AioCb.html#method.write), and -//! [`fsync`](struct.AioCb.html#method.fsync) operations. Completion +//! devices. It supports [`read`](struct.AioRead.html#method.new), +//! [`write`](struct.AioWrite.html#method.new), +//! [`fsync`](struct.AioFsync.html#method.new), +//! [`readv`](struct.AioReadv.html#method.new), and +//! [`writev`](struct.AioWritev.html#method.new), operations, subject to +//! platform support. Completion //! notifications can optionally be delivered via //! [signals](../signal/enum.SigevNotify.html#variant.SigevSignal), via the //! [`aio_suspend`](fn.aio_suspend.html) function, or via polling. Some @@ -17,23 +20,30 @@ //! that they will be executed atomically. //! //! Outstanding operations may be cancelled with -//! [`cancel`](struct.AioCb.html#method.cancel) or +//! [`cancel`](trait.Aio.html#method.cancel) or //! [`aio_cancel_all`](fn.aio_cancel_all.html), though the operating system may //! not support this for all filesystems and devices. +#[cfg(target_os = "freebsd")] +use std::io::{IoSlice, IoSliceMut}; +use std::{ + convert::TryFrom, + fmt::{self, Debug}, + marker::{PhantomData, PhantomPinned}, + mem, + os::unix::io::RawFd, + pin::Pin, + ptr, + thread, +}; -use crate::Result; -use crate::errno::Errno; -use std::os::unix::io::RawFd; -use libc::{c_void, off_t, size_t}; -use std::fmt; -use std::fmt::Debug; -use std::marker::PhantomData; -use std::mem; -use std::pin::Pin; -use std::ptr::{null, null_mut}; -use crate::sys::signal::*; -use std::thread; -use crate::sys::time::TimeSpec; +use libc::{c_void, off_t}; +use pin_utils::unsafe_pinned; + +use crate::{ + errno::Errno, + sys::{signal::*, time::TimeSpec}, + Result, +}; libc_enum! { /// Mode for `AioCb::fsync`. Controls whether only data or both data and @@ -52,22 +62,7 @@ libc_enum! { #[cfg_attr(docsrs, doc(cfg(all())))] O_DSYNC } -} - -libc_enum! { - /// When used with [`lio_listio`](fn.lio_listio.html), determines whether a - /// given `aiocb` should be used for a read operation, a write operation, or - /// ignored. Has no effect for any other aio functions. - #[repr(i32)] - #[non_exhaustive] - pub enum LioOpcode { - /// No operation - LIO_NOP, - /// Write data as if by a call to [`AioCb::write`] - LIO_WRITE, - /// Write data as if by a call to [`AioCb::read`] - LIO_READ, - } + impl TryFrom } libc_enum! { @@ -103,354 +98,133 @@ struct LibcAiocb(libc::aiocb); unsafe impl Send for LibcAiocb {} unsafe impl Sync for LibcAiocb {} -/// AIO Control Block. -/// -/// The basic structure used by all aio functions. Each `AioCb` represents one -/// I/O request. -pub struct AioCb<'a> { - aiocb: LibcAiocb, - /// Tracks whether the buffer pointed to by `libc::aiocb.aio_buf` is mutable - mutable: bool, +/// Base class for all AIO operations. Should only be used directly when +/// checking for completion. +// We could create some kind of AsPinnedMut trait, and implement it for all aio +// ops, allowing the crate's users to get pinned references to `AioCb`. That +// could save some code for things like polling methods. But IMHO it would +// provide polymorphism at the wrong level. Instead, the best place for +// polymorphism is at the level of `Futures`. +#[repr(C)] +struct AioCb { + aiocb: LibcAiocb, /// Could this `AioCb` potentially have any in-kernel state? + // It would be really nice to perform the in-progress check entirely at + // compile time. But I can't figure out how, because: + // * Future::poll takes a `Pin<&mut self>` rather than `self`, and + // * Rust's lack of an equivalent of C++'s Guaranteed Copy Elision means + // that there's no way to write an AioCb constructor that neither boxes + // the object itself, nor moves it during return. in_progress: bool, - _buffer: std::marker::PhantomData<&'a [u8]>, - _pin: std::marker::PhantomPinned } -impl<'a> AioCb<'a> { - /// Returns the underlying file descriptor associated with the `AioCb` - pub fn fd(&self) -> RawFd { - self.aiocb.0.aio_fildes - } +impl AioCb { + pin_utils::unsafe_unpinned!(aiocb: LibcAiocb); - /// Constructs a new `AioCb` with no associated buffer. - /// - /// The resulting `AioCb` structure is suitable for use with `AioCb::fsync`. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio`. - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// - /// # Examples - /// - /// Create an `AioCb` from a raw file descriptor and use it for an - /// [`fsync`](#method.fsync) operation. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify::SigevNone; - /// # use std::{thread, time}; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// let f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_fd( f.as_raw_fd(), 0, SigevNone); - /// aiocb.fsync(AioFsyncMode::O_SYNC).expect("aio_fsync failed early"); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// aiocb.aio_return().expect("aio_fsync failed late"); - /// ``` - pub fn from_fd(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> Pin>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = 0; - a.0.aio_nbytes = 0; - a.0.aio_buf = null_mut(); - - Box::pin(AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn aio_return(mut self: Pin<&mut Self>) -> Result { + self.in_progress = false; + unsafe { + let p: *mut libc::aiocb = &mut self.aiocb.0; + Errno::result(libc::aio_return(p)) + } + .map(|r| r as usize) } - // Private helper - #[cfg(not(any(target_os = "ios", target_os = "macos")))] - fn from_mut_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a mut [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> - { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - a.0.aio_buf = buf.as_ptr() as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; + fn cancel(mut self: Pin<&mut Self>) -> Result { + let r = unsafe { + libc::aio_cancel(self.aiocb.0.aio_fildes, &mut self.aiocb.0) + }; + match r { + libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), + libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), + libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), + -1 => Err(Errno::last()), + _ => panic!("unknown aio_cancel return value"), + } + } + fn common_init(fd: RawFd, prio: i32, sigev_notify: SigevNotify) -> Self { + // Use mem::zeroed instead of explicitly zeroing each field, because the + // number and name of reserved fields is OS-dependent. On some OSes, + // some reserved fields are used the kernel for state, and must be + // explicitly zeroed when allocated. + let mut a = unsafe { mem::zeroed::() }; + a.aio_fildes = fd; + a.aio_reqprio = prio; + a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); AioCb { - aiocb: a, - mutable: true, + aiocb: LibcAiocb(a), in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned } } - /// Constructs a new `AioCb` from a mutable slice. - /// - /// The resulting `AioCb` will be suitable for both read and write - /// operations, but only if the borrow checker can guarantee that the slice - /// will outlive the `AioCb`. That will usually be the case if the `AioCb` - /// is stack-allocated. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: A memory buffer - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Examples - /// - /// Create an `AioCb` from a mutable slice and read into it. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::io::Write; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const INITIAL: &[u8] = b"abcdef123456"; - /// const LEN: usize = 4; - /// let mut rbuf = vec![0; LEN]; - /// let mut f = tempfile().unwrap(); - /// f.write_all(INITIAL).unwrap(); - /// { - /// let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - /// 2, //offset - /// &mut rbuf, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.read().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN); - /// } - /// assert_eq!(rbuf, b"cdef"); - /// ``` - pub fn from_mut_slice(fd: RawFd, offs: off_t, buf: &'a mut [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - a.0.aio_buf = buf.as_ptr() as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: true, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn error(self: Pin<&mut Self>) -> Result<()> { + let r = unsafe { libc::aio_error(&self.aiocb().0) }; + match r { + 0 => Ok(()), + num if num > 0 => Err(Errno::from_i32(num)), + -1 => Err(Errno::last()), + num => panic!("unknown aio_error return value {:?}", num), + } } - /// Constructs a new `AioCb` from a mutable raw pointer - /// - /// Unlike `from_mut_slice`, this method returns a structure suitable for - /// placement on the heap. It may be used for both reads and writes. Due - /// to its unsafety, this method is not recommended. It is most useful when - /// heap allocation is required. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: Pointer to the memory buffer - /// * `len`: Length of the buffer pointed to by `buf` - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Safety - /// - /// The caller must ensure that the storage pointed to by `buf` outlives the - /// `AioCb`. The lifetime checker can't help here. - pub unsafe fn from_mut_ptr(fd: RawFd, offs: off_t, - buf: *mut c_void, len: usize, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = len; - a.0.aio_buf = buf; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: true, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned, - }) + fn in_progress(&self) -> bool { + self.in_progress } - /// Constructs a new `AioCb` from a raw pointer. - /// - /// Unlike `from_slice`, this method returns a structure suitable for - /// placement on the heap. Due to its unsafety, this method is not - /// recommended. It is most useful when heap allocation is required. - /// - /// # Parameters - /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: Pointer to the memory buffer - /// * `len`: Length of the buffer pointed to by `buf` - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb - /// - /// # Safety - /// - /// The caller must ensure that the storage pointed to by `buf` outlives the - /// `AioCb`. The lifetime checker can't help here. - pub unsafe fn from_ptr(fd: RawFd, offs: off_t, - buf: *const c_void, len: usize, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin>> { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = len; - // casting a const ptr to a mutable ptr here is ok, because we set the - // AioCb's mutable field to false - a.0.aio_buf = buf as *mut c_void; - a.0.aio_lio_opcode = opcode as libc::c_int; - - Box::pin(AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - }) + fn set_in_progress(mut self: Pin<&mut Self>) { + self.as_mut().in_progress = true; } - // Private helper - fn from_slice_unpinned(fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb - { - let mut a = AioCb::common_init(fd, prio, sigev_notify); - a.0.aio_offset = offs; - a.0.aio_nbytes = buf.len() as size_t; - // casting an immutable buffer to a mutable pointer looks unsafe, - // but technically its only unsafe to dereference it, not to create - // it. - a.0.aio_buf = buf.as_ptr() as *mut c_void; - assert!(opcode != LioOpcode::LIO_READ, "Can't read into an immutable buffer"); - a.0.aio_lio_opcode = opcode as libc::c_int; - - AioCb { - aiocb: a, - mutable: false, - in_progress: false, - _buffer: PhantomData, - _pin: std::marker::PhantomPinned - } + /// Update the notification settings for an existing AIO operation that has + /// not yet been submitted. + // Takes a normal reference rather than a pinned one because this method is + // normally called before the object needs to be pinned, that is, before + // it's been submitted to the kernel. + fn set_sigev_notify(&mut self, sigev_notify: SigevNotify) { + assert!( + !self.in_progress, + "Can't change notification settings for an in-progress operation" + ); + self.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); } +} - /// Like [`AioCb::from_mut_slice`], but works on constant slices rather than - /// mutable slices. - /// - /// An `AioCb` created this way cannot be used with `read`, and its - /// `LioOpcode` cannot be set to `LIO_READ`. This method is useful when - /// writing a const buffer with `AioCb::write`, since `from_mut_slice` can't - /// work with const buffers. - /// - /// # Examples - /// - /// Construct an `AioCb` from a slice and use it for writing. - /// - /// ``` - /// # use nix::errno::Errno; - /// # use nix::Error; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::{thread, time}; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { - /// thread::sleep(time::Duration::from_millis(10)); - /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); - /// ``` - // Note: another solution to the problem of writing const buffers would be - // to genericize AioCb for both &mut [u8] and &[u8] buffers. AioCb::read - // could take the former and AioCb::write could take the latter. However, - // then lio_listio wouldn't work, because that function needs a slice of - // AioCb, and they must all be of the same type. - pub fn from_slice(fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Pin> - { - Box::pin(AioCb::from_slice_unpinned(fd, offs, buf, prio, sigev_notify, - opcode)) +impl Debug for AioCb { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("AioCb") + .field("aiocb", &self.aiocb.0) + .field("in_progress", &self.in_progress) + .finish() } +} - fn common_init(fd: RawFd, prio: libc::c_int, - sigev_notify: SigevNotify) -> LibcAiocb { - // Use mem::zeroed instead of explicitly zeroing each field, because the - // number and name of reserved fields is OS-dependent. On some OSes, - // some reserved fields are used the kernel for state, and must be - // explicitly zeroed when allocated. - let mut a = unsafe { mem::zeroed::()}; - a.aio_fildes = fd; - a.aio_reqprio = prio; - a.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); - LibcAiocb(a) +impl Drop for AioCb { + /// If the `AioCb` has no remaining state in the kernel, just drop it. + /// Otherwise, dropping constitutes a resource leak, which is an error + fn drop(&mut self) { + assert!( + thread::panicking() || !self.in_progress, + "Dropped an in-progress AioCb" + ); } +} - /// Update the notification settings for an existing `aiocb` - pub fn set_sigev_notify(self: &mut Pin>, - sigev_notify: SigevNotify) - { - // Safe because we don't move any of the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.aiocb.0.aio_sigevent = SigEvent::new(sigev_notify).sigevent(); - } +/// Methods common to all AIO operations +pub trait Aio { + /// The return type of [`Aio::aio_return`]. + type Output; + + /// Retrieve return status of an asynchronous operation. + /// + /// Should only be called once for each operation, after [`Aio::error`] + /// indicates that it has completed. The result is the same as for the + /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions. + /// + /// # References + /// + /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) + fn aio_return(self: Pin<&mut Self>) -> Result; /// Cancels an outstanding AIO request. /// @@ -477,51 +251,26 @@ impl<'a> AioCb<'a> { /// # use tempfile::tempfile; /// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), + /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// &wbuf[..], /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// let cs = aiocb.cancel().unwrap(); + /// SigevNotify::SigevNone)); + /// aiocb.as_mut().submit().unwrap(); + /// let cs = aiocb.as_mut().cancel().unwrap(); /// if cs == AioCancelStat::AioNotCanceled { - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { + /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } /// } /// // Must call `aio_return`, but ignore the result - /// let _ = aiocb.aio_return(); + /// let _ = aiocb.as_mut().aio_return(); /// ``` /// /// # References /// /// [aio_cancel](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) - pub fn cancel(self: &mut Pin>) -> Result { - let r = unsafe { - let selfp = self.as_mut().get_unchecked_mut(); - libc::aio_cancel(selfp.aiocb.0.aio_fildes, &mut selfp.aiocb.0) - }; - match r { - libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), - libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), - libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), - -1 => Err(Errno::last()), - _ => panic!("unknown aio_cancel return value") - } - } - - fn error_unpinned(&mut self) -> Result<()> { - let r = unsafe { - libc::aio_error(&mut self.aiocb.0 as *mut libc::aiocb) - }; - match r { - 0 => Ok(()), - num if num > 0 => Err(Errno::from_i32(num)), - -1 => Err(Errno::last()), - num => panic!("unknown aio_error return value {:?}", num) - } - } + fn cancel(self: Pin<&mut Self>) -> Result; /// Retrieve error status of an asynchronous operation. /// @@ -543,155 +292,222 @@ impl<'a> AioCb<'a> { /// # use tempfile::tempfile; /// const WBUF: &[u8] = b"abcdef123456"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), + /// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), /// 2, //offset /// WBUF, /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_NOP); - /// aiocb.write().unwrap(); - /// while (aiocb.error() == Err(Errno::EINPROGRESS)) { + /// SigevNotify::SigevNone)); + /// aiocb.as_mut().submit().unwrap(); + /// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); + /// assert_eq!(aiocb.as_mut().aio_return().unwrap(), WBUF.len()); /// ``` /// /// # References /// /// [aio_error](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_error.html) - pub fn error(self: &mut Pin>) -> Result<()> { - // Safe because error_unpinned doesn't move the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.error_unpinned() - } + fn error(self: Pin<&mut Self>) -> Result<()>; - /// An asynchronous version of `fsync(2)`. + /// Returns the underlying file descriptor associated with the operation. + fn fd(&self) -> RawFd; + + /// Does this operation currently have any in-kernel state? /// - /// # References + /// Dropping an operation that does have in-kernel state constitutes a + /// resource leak. /// - /// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) - pub fn fsync(self: &mut Pin>, mode: AioFsyncMode) -> Result<()> { - // Safe because we don't move the libc::aiocb - unsafe { - let selfp = self.as_mut().get_unchecked_mut(); - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - libc::aio_fsync(mode as libc::c_int, p) - }).map(|_| { - selfp.in_progress = true; - }) - } - } - - /// Returns the `aiocb`'s `LioOpcode` field + /// # Examples /// - /// If the value cannot be represented as an `LioOpcode`, returns `None` - /// instead. - pub fn lio_opcode(&self) -> Option { - match self.aiocb.0.aio_lio_opcode { - libc::LIO_READ => Some(LioOpcode::LIO_READ), - libc::LIO_WRITE => Some(LioOpcode::LIO_WRITE), - libc::LIO_NOP => Some(LioOpcode::LIO_NOP), - _ => None - } - } + /// ``` + /// # use nix::errno::Errno; + /// # use nix::Error; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify::SigevNone; + /// # use std::{thread, time}; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// let f = tempfile().unwrap(); + /// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, + /// 0, SigevNone)); + /// assert!(!aiof.as_mut().in_progress()); + /// aiof.as_mut().submit().expect("aio_fsync failed early"); + /// assert!(aiof.as_mut().in_progress()); + /// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// } + /// aiof.as_mut().aio_return().expect("aio_fsync failed late"); + /// assert!(!aiof.as_mut().in_progress()); + /// ``` + fn in_progress(&self) -> bool; - /// Returns the requested length of the aio operation in bytes - /// - /// This method returns the *requested* length of the operation. To get the - /// number of bytes actually read or written by a completed operation, use - /// `aio_return` instead. - pub fn nbytes(&self) -> usize { - self.aiocb.0.aio_nbytes - } + /// Returns the priority of the `AioCb` + fn priority(&self) -> i32; - /// Returns the file offset stored in the `AioCb` - pub fn offset(&self) -> off_t { - self.aiocb.0.aio_offset - } + /// Update the notification settings for an existing AIO operation that has + /// not yet been submitted. + fn set_sigev_notify(&mut self, sev: SigevNotify); - /// Returns the priority of the `AioCb` - pub fn priority(&self) -> libc::c_int { - self.aiocb.0.aio_reqprio - } + /// Returns the `SigEvent` that will be used for notification. + fn sigevent(&self) -> SigEvent; - /// Asynchronously reads from a file descriptor into a buffer + /// Actually start the I/O operation. /// - /// # References - /// - /// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) - pub fn read(self: &mut Pin>) -> Result<()> { - assert!(self.mutable, "Can't read into an immutable buffer"); - // Safe because we don't move anything - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - unsafe { libc::aio_read(p) } - }).map(|_| { - selfp.in_progress = true; - }) - } + /// After calling this method and until [`Aio::aio_return`] returns `Ok`, + /// the structure may not be moved in memory. + fn submit(self: Pin<&mut Self>) -> Result<()>; +} - /// Returns the `SigEvent` stored in the `AioCb` - pub fn sigevent(&self) -> SigEvent { - SigEvent::from(&self.aiocb.0.aio_sigevent) - } +macro_rules! aio_methods { + () => { + fn cancel(self: Pin<&mut Self>) -> Result { + self.aiocb().cancel() + } - fn aio_return_unpinned(&mut self) -> Result { - unsafe { - let p: *mut libc::aiocb = &mut self.aiocb.0; - self.in_progress = false; - Errno::result(libc::aio_return(p)) + fn error(self: Pin<&mut Self>) -> Result<()> { + self.aiocb().error() } - } - /// Retrieve return status of an asynchronous operation. - /// - /// Should only be called once for each `AioCb`, after `AioCb::error` - /// indicates that it has completed. The result is the same as for the - /// synchronous `read(2)`, `write(2)`, of `fsync(2)` functions. - /// - /// # References - /// - /// [aio_return](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_return.html) - // Note: this should be just `return`, but that's a reserved word - pub fn aio_return(self: &mut Pin>) -> Result { - // Safe because aio_return_unpinned does not move the data - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - selfp.aio_return_unpinned() - } + fn fd(&self) -> RawFd { + self.aiocb.aiocb.0.aio_fildes + } - /// Asynchronously writes from a buffer to a file descriptor - /// - /// # References - /// - /// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) - pub fn write(self: &mut Pin>) -> Result<()> { - // Safe because we don't move anything - let selfp = unsafe { - self.as_mut().get_unchecked_mut() - }; - Errno::result({ - let p: *mut libc::aiocb = &mut selfp.aiocb.0; - unsafe{ libc::aio_write(p) } - }).map(|_| { - selfp.in_progress = true; - }) - } -} + fn in_progress(&self) -> bool { + self.aiocb.in_progress() + } -/// Cancels outstanding AIO requests for a given file descriptor. + fn priority(&self) -> i32 { + self.aiocb.aiocb.0.aio_reqprio + } + + fn set_sigev_notify(&mut self, sev: SigevNotify) { + self.aiocb.set_sigev_notify(sev) + } + + fn sigevent(&self) -> SigEvent { + SigEvent::from(&self.aiocb.aiocb.0.aio_sigevent) + } + }; + ($func:ident) => { + aio_methods!(); + + fn aio_return(self: Pin<&mut Self>) -> Result<::Output> { + self.aiocb().aio_return() + } + + fn submit(mut self: Pin<&mut Self>) -> Result<()> { + let p: *mut libc::aiocb = &mut self.as_mut().aiocb().aiocb.0; + Errno::result({ unsafe { libc::$func(p) } }).map(|_| { + self.aiocb().set_in_progress(); + }) + } + }; +} + +/// An asynchronous version of `fsync(2)`. +/// +/// # References +/// +/// [aio_fsync](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_fsync.html) +/// # Examples +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify::SigevNone; +/// # use std::{thread, time}; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// let f = tempfile().unwrap(); +/// let mut aiof = Box::pin(AioFsync::new(f.as_raw_fd(), AioFsyncMode::O_SYNC, +/// 0, SigevNone)); +/// aiof.as_mut().submit().expect("aio_fsync failed early"); +/// while (aiof.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// aiof.as_mut().aio_return().expect("aio_fsync failed late"); +/// ``` +#[derive(Debug)] +#[repr(transparent)] +pub struct AioFsync { + aiocb: AioCb, + _pin: PhantomPinned, +} + +impl AioFsync { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the operation's fsync mode: data and metadata or data only? + pub fn mode(&self) -> AioFsyncMode { + AioFsyncMode::try_from(self.aiocb.aiocb.0.aio_lio_opcode).unwrap() + } + + /// Create a new `AioFsync`. + /// + /// # Arguments + /// + /// * `fd`: File descriptor to sync. + /// * `mode`: Whether to sync file metadata too, or just data. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio`. + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + mode: AioFsyncMode, + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // To save some memory, store mode in an unused field of the AioCb. + // True it isn't very much memory, but downstream creates will likely + // create an enum containing this and other AioCb variants and pack + // those enums into data structures like Vec, so it adds up. + aiocb.aiocb.0.aio_lio_opcode = mode as libc::c_int; + AioFsync { + aiocb, + _pin: PhantomPinned, + } + } +} + +impl Aio for AioFsync { + type Output = (); + + aio_methods!(); + + fn aio_return(self: Pin<&mut Self>) -> Result<()> { + self.aiocb().aio_return().map(drop) + } + + fn submit(mut self: Pin<&mut Self>) -> Result<()> { + let aiocb = &mut self.as_mut().aiocb().aiocb.0; + let mode = mem::replace(&mut aiocb.aio_lio_opcode, 0); + let p: *mut libc::aiocb = aiocb; + Errno::result(unsafe { libc::aio_fsync(mode, p) }).map(|_| { + self.aiocb().set_in_progress(); + }) + } +} + +// AioFsync does not need AsMut, since it can't be used with lio_listio + +impl AsRef for AioFsync { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 + } +} + +/// Asynchronously reads from a file descriptor into a buffer +/// +/// # References +/// +/// [aio_read](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_read.html) /// /// # Examples /// -/// Issue an aio operation, then cancel all outstanding operations on that file -/// descriptor. /// /// ``` /// # use nix::errno::Errno; @@ -702,428 +518,727 @@ impl<'a> AioCb<'a> { /// # use std::io::Write; /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; -/// let wbuf = b"CDEF"; +/// const INITIAL: &[u8] = b"abcdef123456"; +/// const LEN: usize = 4; +/// let mut rbuf = vec![0; LEN]; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), -/// 2, //offset -/// &wbuf[..], -/// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_NOP); -/// aiocb.write().unwrap(); -/// let cs = aio_cancel_all(f.as_raw_fd()).unwrap(); -/// if cs == AioCancelStat::AioNotCanceled { -/// while (aiocb.error() == Err(Errno::EINPROGRESS)) { +/// f.write_all(INITIAL).unwrap(); +/// { +/// let mut aior = Box::pin( +/// AioRead::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &mut rbuf, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aior.as_mut().submit().unwrap(); +/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { /// thread::sleep(time::Duration::from_millis(10)); /// } +/// assert_eq!(aior.as_mut().aio_return().unwrap(), LEN); /// } -/// // Must call `aio_return`, but ignore the result -/// let _ = aiocb.aio_return(); +/// assert_eq!(rbuf, b"cdef"); /// ``` -/// -/// # References -/// -/// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) -pub fn aio_cancel_all(fd: RawFd) -> Result { - match unsafe { libc::aio_cancel(fd, null_mut()) } { - libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), - libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), - libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), - -1 => Err(Errno::last()), - _ => panic!("unknown aio_cancel return value") +#[derive(Debug)] +#[repr(transparent)] +pub struct AioRead<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [u8]>, + _pin: PhantomPinned, +} + +impl<'a> AioRead<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the requested length of the aio operation in bytes + /// + /// This method returns the *requested* length of the operation. To get the + /// number of bytes actually read or written by a completed operation, use + /// `aio_return` instead. + pub fn nbytes(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes + } + + /// Create a new `AioRead`, placing the data in a mutable slice. + /// + /// # Arguments + /// + /// * `fd`: File descriptor to read from + /// * `offs`: File offset + /// * `buf`: A memory buffer. It must outlive the `AioRead`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + buf: &'a mut [u8], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + aiocb.aiocb.0.aio_nbytes = buf.len(); + aiocb.aiocb.0.aio_buf = buf.as_mut_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READ; + aiocb.aiocb.0.aio_offset = offs; + AioRead { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } + } + + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset } } -/// Suspends the calling process until at least one of the specified `AioCb`s -/// has completed, a signal is delivered, or the timeout has passed. +impl<'a> Aio for AioRead<'a> { + type Output = usize; + + aio_methods!(aio_read); +} + +impl<'a> AsMut for AioRead<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 + } +} + +impl<'a> AsRef for AioRead<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 + } +} + +/// Asynchronously reads from a file descriptor into a scatter/gather list of buffers. /// -/// If `timeout` is `None`, `aio_suspend` will block indefinitely. +/// # References +/// +/// [aio_readv](https://www.freebsd.org/cgi/man.cgi?query=aio_readv) /// /// # Examples /// -/// Use `aio_suspend` to block until an aio operation completes. /// -/// ``` +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] +/// # use nix::errno::Errno; +/// # use nix::Error; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::{IoSliceMut, Write}; /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; -/// const WBUF: &[u8] = b"abcdef123456"; +/// const INITIAL: &[u8] = b"abcdef123456"; +/// let mut rbuf0 = vec![0; 4]; +/// let mut rbuf1 = vec![0; 2]; +/// let expected_len = rbuf0.len() + rbuf1.len(); +/// let mut rbufs = [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), -/// 2, //offset -/// WBUF, -/// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_NOP); -/// aiocb.write().unwrap(); -/// aio_suspend(&[aiocb.as_ref()], None).expect("aio_suspend failed"); -/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); +/// f.write_all(INITIAL).unwrap(); +/// { +/// let mut aior = Box::pin( +/// AioReadv::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &mut rbufs, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aior.as_mut().submit().unwrap(); +/// while (aior.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aior.as_mut().aio_return().unwrap(), expected_len); +/// } +/// assert_eq!(rbuf0, b"cdef"); +/// assert_eq!(rbuf1, b"12"); /// ``` -/// # References -/// -/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html) -pub fn aio_suspend(list: &[Pin<&AioCb>], timeout: Option) -> Result<()> { - let plist = list as *const [Pin<&AioCb>] as *const [*const libc::aiocb]; - let p = plist as *const *const libc::aiocb; - let timep = match timeout { - None => null::(), - Some(x) => x.as_ref() as *const libc::timespec - }; - Errno::result(unsafe { - libc::aio_suspend(p, list.len() as i32, timep) - }).map(drop) +#[cfg(target_os = "freebsd")] +#[derive(Debug)] +#[repr(transparent)] +pub struct AioReadv<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [&'a [u8]]>, + _pin: PhantomPinned, } -impl<'a> Debug for AioCb<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("AioCb") - .field("aiocb", &self.aiocb.0) - .field("mutable", &self.mutable) - .field("in_progress", &self.in_progress) - .finish() +#[cfg(target_os = "freebsd")] +impl<'a> AioReadv<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the number of buffers the operation will read into. + pub fn iovlen(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes } -} -impl<'a> Drop for AioCb<'a> { - /// If the `AioCb` has no remaining state in the kernel, just drop it. - /// Otherwise, dropping constitutes a resource leak, which is an error - fn drop(&mut self) { - assert!(thread::panicking() || !self.in_progress, - "Dropped an in-progress AioCb"); + /// Create a new `AioReadv`, placing the data in a list of mutable slices. + /// + /// # Arguments + /// + /// * `fd`: File descriptor to read from + /// * `offs`: File offset + /// * `bufs`: A scatter/gather list of memory buffers. They must + /// outlive the `AioReadv`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + bufs: &mut [IoSliceMut<'a>], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // In vectored mode, aio_nbytes stores the length of the iovec array, + // not the byte count. + aiocb.aiocb.0.aio_nbytes = bufs.len(); + aiocb.aiocb.0.aio_buf = bufs.as_mut_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_READV; + aiocb.aiocb.0.aio_offset = offs; + AioReadv { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } + } + + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset } } -/// LIO Control Block. -/// -/// The basic structure used to issue multiple AIO operations simultaneously. -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -pub struct LioCb<'a> { - /// A collection of [`AioCb`]s. All of these will be issued simultaneously - /// by the [`listio`] method. - /// - /// [`AioCb`]: struct.AioCb.html - /// [`listio`]: #method.listio - // Their locations in memory must be fixed once they are passed to the - // kernel. So this field must be non-public so the user can't swap. - aiocbs: Box<[AioCb<'a>]>, +#[cfg(target_os = "freebsd")] +impl<'a> Aio for AioReadv<'a> { + type Output = usize; - /// The actual list passed to `libc::lio_listio`. - /// - /// It must live for as long as any of the operations are still being - /// processesed, because the aio subsystem uses its address as a unique - /// identifier. - list: Vec<*mut libc::aiocb>, - - /// A partial set of results. This field will get populated by - /// `listio_resubmit` when an `LioCb` is resubmitted after an error - results: Vec>> + aio_methods!(aio_readv); } -/// LioCb can't automatically impl Send and Sync just because of the raw -/// pointers in list. But that's stupid. There's no reason that raw pointers -/// should automatically be non-Send -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -unsafe impl<'a> Send for LioCb<'a> {} -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -unsafe impl<'a> Sync for LioCb<'a> {} - -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -impl<'a> LioCb<'a> { - /// Are no [`AioCb`]s contained? - pub fn is_empty(&self) -> bool { - self.aiocbs.is_empty() +#[cfg(target_os = "freebsd")] +impl<'a> AsMut for AioReadv<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 } +} - /// Return the number of individual [`AioCb`]s contained. - pub fn len(&self) -> usize { - self.aiocbs.len() +#[cfg(target_os = "freebsd")] +impl<'a> AsRef for AioReadv<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } +} - /// Submits multiple asynchronous I/O requests with a single system call. - /// - /// They are not guaranteed to complete atomically, and the order in which - /// the requests are carried out is not specified. Reads, writes, and - /// fsyncs may be freely mixed. - /// - /// This function is useful for reducing the context-switch overhead of - /// submitting many AIO operations. It can also be used with - /// `LioMode::LIO_WAIT` to block on the result of several independent - /// operations. Used that way, it is often useful in programs that - /// otherwise make little use of AIO. - /// - /// # Examples - /// - /// Use `listio` to submit an aio operation and wait for its completion. In - /// this case, there is no need to use [`aio_suspend`] to wait or - /// [`AioCb::error`] to poll. - /// - /// ``` - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::os::unix::io::AsRawFd; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCbBuilder::with_capacity(1) - /// .emplace_slice( - /// f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE - /// ).finish(); - /// liocb.listio(LioMode::LIO_WAIT, - /// SigevNotify::SigevNone).unwrap(); - /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - /// ``` - /// - /// # References - /// - /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) +/// Asynchronously writes from a buffer to a file descriptor +/// +/// # References +/// +/// [aio_write](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_write.html) +/// +/// # Examples +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin( +/// AioWrite::new( +/// f.as_raw_fd(), +/// 2, //offset +/// WBUF, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aiow.as_mut().submit().unwrap(); +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +#[derive(Debug)] +#[repr(transparent)] +pub struct AioWrite<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [u8]>, + _pin: PhantomPinned, +} + +impl<'a> AioWrite<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the requested length of the aio operation in bytes /// - /// [`aio_suspend`]: fn.aio_suspend.html - /// [`AioCb::error`]: struct.AioCb.html#method.error - pub fn listio(&mut self, mode: LioMode, - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - self.list.clear(); - for a in &mut self.aiocbs.iter_mut() { - a.in_progress = true; - self.list.push(a as *mut AioCb<'a> - as *mut libc::aiocb); - } - let p = self.list.as_ptr(); - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) - }).map(drop) + /// This method returns the *requested* length of the operation. To get the + /// number of bytes actually read or written by a completed operation, use + /// `aio_return` instead. + pub fn nbytes(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes } - /// Resubmits any incomplete operations with [`lio_listio`]. - /// - /// Sometimes, due to system resource limitations, an `lio_listio` call will - /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return - /// `EINTR`. In any of these cases, only a subset of its constituent - /// operations will actually have been initiated. `listio_resubmit` will - /// resubmit any operations that are still uninitiated. - /// - /// After calling `listio_resubmit`, results should be collected by - /// [`LioCb::aio_return`]. - /// - /// # Examples - /// ```no_run - /// # use nix::Error; - /// # use nix::errno::Errno; - /// # use nix::sys::aio::*; - /// # use nix::sys::signal::SigevNotify; - /// # use std::os::unix::io::AsRawFd; - /// # use std::{thread, time}; - /// # use tempfile::tempfile; - /// const WBUF: &[u8] = b"abcdef123456"; - /// let mut f = tempfile().unwrap(); - /// let mut liocb = LioCbBuilder::with_capacity(1) - /// .emplace_slice( - /// f.as_raw_fd(), - /// 2, //offset - /// WBUF, - /// 0, //priority - /// SigevNotify::SigevNone, - /// LioOpcode::LIO_WRITE - /// ).finish(); - /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); - /// while err == Err(Errno::EIO) || - /// err == Err(Errno::EAGAIN) { - /// thread::sleep(time::Duration::from_millis(10)); - /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); - /// } - /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - /// ``` - /// - /// # References + /// Construct a new `AioWrite`. /// - /// [`lio_listio`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// # Arguments /// - /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html - /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return - // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be - // changed by this method, because the kernel relies on their addresses - // being stable. - // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the - // sigev_notify will immediately refire. - pub fn listio_resubmit(&mut self, mode:LioMode, - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - self.list.clear(); - - while self.results.len() < self.aiocbs.len() { - self.results.push(None); - } - - for (i, a) in self.aiocbs.iter_mut().enumerate() { - if self.results[i].is_some() { - // Already collected final status for this operation - continue; - } - match a.error_unpinned() { - Ok(()) => { - // aiocb is complete; collect its status and don't resubmit - self.results[i] = Some(a.aio_return_unpinned()); - }, - Err(Errno::EAGAIN) => { - self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); - }, - Err(Errno::EINPROGRESS) => { - // aiocb is was successfully queued; no need to do anything - }, - Err(Errno::EINVAL) => panic!( - "AioCb was never submitted, or already finalized"), - _ => unreachable!() - } + /// * `fd`: File descriptor to write to + /// * `offs`: File offset + /// * `buf`: A memory buffer. It must outlive the `AioWrite`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + buf: &'a [u8], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + aiocb.aiocb.0.aio_nbytes = buf.len(); + // casting an immutable buffer to a mutable pointer looks unsafe, + // but technically its only unsafe to dereference it, not to create + // it. Type Safety guarantees that we'll never pass aiocb to + // aio_read or aio_readv. + aiocb.aiocb.0.aio_buf = buf.as_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITE; + aiocb.aiocb.0.aio_offset = offs; + AioWrite { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, } - let p = self.list.as_ptr(); - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) - }).map(drop) } - /// Collect final status for an individual `AioCb` submitted as part of an - /// `LioCb`. - /// - /// This is just like [`AioCb::aio_return`], except it takes into account - /// operations that were restarted by [`LioCb::listio_resubmit`] - /// - /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return - /// [`LioCb::listio_resubmit`]: #method.listio_resubmit - pub fn aio_return(&mut self, i: usize) -> Result { - if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].aio_return_unpinned() - } else { - self.results[i].unwrap() - } + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset } +} - /// Retrieve error status of an individual `AioCb` submitted as part of an - /// `LioCb`. - /// - /// This is just like [`AioCb::error`], except it takes into account - /// operations that were restarted by [`LioCb::listio_resubmit`] - /// - /// [`AioCb::error`]: struct.AioCb.html#method.error - /// [`LioCb::listio_resubmit`]: #method.listio_resubmit - pub fn error(&mut self, i: usize) -> Result<()> { - if i >= self.results.len() || self.results[i].is_none() { - self.aiocbs[i].error_unpinned() - } else { - Ok(()) - } +impl<'a> Aio for AioWrite<'a> { + type Output = usize; + + aio_methods!(aio_write); +} + +impl<'a> AsMut for AioWrite<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 } } -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -impl<'a> Debug for LioCb<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("LioCb") - .field("aiocbs", &self.aiocbs) - .finish() +impl<'a> AsRef for AioWrite<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } } -/// Used to construct `LioCb` -// This must be a separate class from LioCb due to pinning constraints. LioCb -// must use a boxed slice of AioCbs so they will have stable storage, but -// LioCbBuilder must use a Vec to make construction possible when the final size -// is unknown. -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] +/// Asynchronously writes from a scatter/gather list of buffers to a file descriptor. +/// +/// # References +/// +/// [aio_writev](https://www.freebsd.org/cgi/man.cgi?query=aio_writev) +/// +/// # Examples +/// +#[cfg_attr(fbsd14, doc = " ```")] +#[cfg_attr(not(fbsd14), doc = " ```no_run")] +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::IoSlice; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const wbuf0: &[u8] = b"abcdef"; +/// const wbuf1: &[u8] = b"123456"; +/// let len = wbuf0.len() + wbuf1.len(); +/// let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin( +/// AioWritev::new( +/// f.as_raw_fd(), +/// 2, //offset +/// &wbufs, +/// 0, //priority +/// SigevNotify::SigevNone +/// ) +/// ); +/// aiow.as_mut().submit().unwrap(); +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), len); +/// ``` +#[cfg(target_os = "freebsd")] #[derive(Debug)] -pub struct LioCbBuilder<'a> { - /// A collection of [`AioCb`]s. - /// - /// [`AioCb`]: struct.AioCb.html - pub aiocbs: Vec>, +#[repr(transparent)] +pub struct AioWritev<'a> { + aiocb: AioCb, + _data: PhantomData<&'a [&'a [u8]]>, + _pin: PhantomPinned, } -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(docsrs, doc(cfg(all())))] -impl<'a> LioCbBuilder<'a> { - /// Initialize an empty `LioCb` - pub fn with_capacity(capacity: usize) -> LioCbBuilder<'a> { - LioCbBuilder { - aiocbs: Vec::with_capacity(capacity), - } +#[cfg(target_os = "freebsd")] +impl<'a> AioWritev<'a> { + unsafe_pinned!(aiocb: AioCb); + + /// Returns the number of buffers the operation will read into. + pub fn iovlen(&self) -> usize { + self.aiocb.aiocb.0.aio_nbytes } - /// Add a new operation on an immutable slice to the [`LioCb`] under - /// construction. + /// Construct a new `AioWritev`. /// - /// Arguments are the same as for [`AioCb::from_slice`] + /// # Arguments /// - /// [`LioCb`]: struct.LioCb.html - /// [`AioCb::from_slice`]: struct.AioCb.html#method.from_slice - #[must_use] - pub fn emplace_slice(mut self, fd: RawFd, offs: off_t, buf: &'a [u8], - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> Self - { - self.aiocbs.push(AioCb::from_slice_unpinned(fd, offs, buf, prio, - sigev_notify, opcode)); - self + /// * `fd`: File descriptor to write to + /// * `offs`: File offset + /// * `bufs`: A scatter/gather list of memory buffers. They must + /// outlive the `AioWritev`. + /// * `prio`: If POSIX Prioritized IO is supported, then the + /// operation will be prioritized at the process's + /// priority level minus `prio` + /// * `sigev_notify`: Determines how you will be notified of event + /// completion. + pub fn new( + fd: RawFd, + offs: off_t, + bufs: &[IoSlice<'a>], + prio: i32, + sigev_notify: SigevNotify, + ) -> Self { + let mut aiocb = AioCb::common_init(fd, prio, sigev_notify); + // In vectored mode, aio_nbytes stores the length of the iovec array, + // not the byte count. + aiocb.aiocb.0.aio_nbytes = bufs.len(); + // casting an immutable buffer to a mutable pointer looks unsafe, + // but technically its only unsafe to dereference it, not to create + // it. Type Safety guarantees that we'll never pass aiocb to + // aio_read or aio_readv. + aiocb.aiocb.0.aio_buf = bufs.as_ptr() as *mut c_void; + aiocb.aiocb.0.aio_lio_opcode = libc::LIO_WRITEV; + aiocb.aiocb.0.aio_offset = offs; + AioWritev { + aiocb, + _data: PhantomData, + _pin: PhantomPinned, + } } - /// Add a new operation on a mutable slice to the [`LioCb`] under - /// construction. - /// - /// Arguments are the same as for [`AioCb::from_mut_slice`] - /// - /// [`LioCb`]: struct.LioCb.html - /// [`AioCb::from_mut_slice`]: struct.AioCb.html#method.from_mut_slice - #[must_use] - pub fn emplace_mut_slice(mut self, fd: RawFd, offs: off_t, - buf: &'a mut [u8], prio: libc::c_int, - sigev_notify: SigevNotify, opcode: LioOpcode) - -> Self - { - self.aiocbs.push(AioCb::from_mut_slice_unpinned(fd, offs, buf, prio, - sigev_notify, opcode)); - self + /// Returns the file offset of the operation. + pub fn offset(&self) -> off_t { + self.aiocb.aiocb.0.aio_offset } +} - /// Finalize this [`LioCb`]. - /// - /// Afterwards it will be possible to issue the operations with - /// [`LioCb::listio`]. Conversely, it will no longer be possible to add new - /// operations with [`LioCbBuilder::emplace_slice`] or - /// [`LioCbBuilder::emplace_mut_slice`]. - /// - /// [`LioCb::listio`]: struct.LioCb.html#method.listio - /// [`LioCb::from_mut_slice`]: struct.LioCb.html#method.from_mut_slice - /// [`LioCb::from_slice`]: struct.LioCb.html#method.from_slice - pub fn finish(self) -> LioCb<'a> { - let len = self.aiocbs.len(); - LioCb { - aiocbs: self.aiocbs.into(), - list: Vec::with_capacity(len), - results: Vec::with_capacity(len) - } +#[cfg(target_os = "freebsd")] +impl<'a> Aio for AioWritev<'a> { + type Output = usize; + + aio_methods!(aio_writev); +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsMut for AioWritev<'a> { + fn as_mut(&mut self) -> &mut libc::aiocb { + &mut self.aiocb.aiocb.0 + } +} + +#[cfg(target_os = "freebsd")] +impl<'a> AsRef for AioWritev<'a> { + fn as_ref(&self) -> &libc::aiocb { + &self.aiocb.aiocb.0 } } -#[cfg(not(any(target_os = "ios", target_os = "macos")))] +/// Cancels outstanding AIO requests for a given file descriptor. +/// +/// # Examples +/// +/// Issue an aio operation, then cancel all outstanding operations on that file +/// descriptor. +/// +/// ``` +/// # use nix::errno::Errno; +/// # use nix::Error; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::{thread, time}; +/// # use std::io::Write; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// let wbuf = b"CDEF"; +/// let mut f = tempfile().unwrap(); +/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), +/// 2, //offset +/// &wbuf[..], +/// 0, //priority +/// SigevNotify::SigevNone)); +/// aiocb.as_mut().submit().unwrap(); +/// let cs = aio_cancel_all(f.as_raw_fd()).unwrap(); +/// if cs == AioCancelStat::AioNotCanceled { +/// while (aiocb.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// } +/// // Must call `aio_return`, but ignore the result +/// let _ = aiocb.as_mut().aio_return(); +/// ``` +/// +/// # References +/// +/// [`aio_cancel`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_cancel.html) +pub fn aio_cancel_all(fd: RawFd) -> Result { + match unsafe { libc::aio_cancel(fd, ptr::null_mut()) } { + libc::AIO_CANCELED => Ok(AioCancelStat::AioCanceled), + libc::AIO_NOTCANCELED => Ok(AioCancelStat::AioNotCanceled), + libc::AIO_ALLDONE => Ok(AioCancelStat::AioAllDone), + -1 => Err(Errno::last()), + _ => panic!("unknown aio_cancel return value"), + } +} + +/// Suspends the calling process until at least one of the specified operations +/// have completed, a signal is delivered, or the timeout has passed. +/// +/// If `timeout` is `None`, `aio_suspend` will block indefinitely. +/// +/// # Examples +/// +/// Use `aio_suspend` to block until an aio operation completes. +/// +/// ``` +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use std::os::unix::io::AsRawFd; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), +/// 2, //offset +/// WBUF, +/// 0, //priority +/// SigevNotify::SigevNone)); +/// aiocb.as_mut().submit().unwrap(); +/// aio_suspend(&[&*aiocb], None).expect("aio_suspend failed"); +/// assert_eq!(aiocb.as_mut().aio_return().unwrap() as usize, WBUF.len()); +/// ``` +/// # References +/// +/// [`aio_suspend`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/aio_suspend.html) +pub fn aio_suspend( + list: &[&dyn AsRef], + timeout: Option, +) -> Result<()> { + let p = list as *const [&dyn AsRef] + as *const [*const libc::aiocb] + as *const *const libc::aiocb; + let timep = match timeout { + None => ptr::null::(), + Some(x) => x.as_ref() as *const libc::timespec, + }; + Errno::result(unsafe { libc::aio_suspend(p, list.len() as i32, timep) }) + .map(drop) +} + +/// Submits multiple asynchronous I/O requests with a single system call. +/// +/// They are not guaranteed to complete atomically, and the order in which the +/// requests are carried out is not specified. Reads, and writes may be freely +/// mixed. +/// +/// # Examples +/// +/// Use `lio_listio` to submit an aio operation and wait for its completion. In +/// this case, there is no need to use aio_suspend to wait or `error` to poll. +/// This mode is useful for otherwise-synchronous programs that want to execute +/// a handful of I/O operations in parallel. +/// ``` +/// # use std::os::unix::io::AsRawFd; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// lio_listio(LioMode::LIO_WAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) +/// .unwrap(); +/// // At this point, we are guaranteed that aiow is complete. +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +/// +/// Use `lio_listio` to submit multiple asynchronous operations with a single +/// syscall, but receive notification individually. This is an efficient +/// technique for reducing overall context-switch overhead, especially when +/// combined with kqueue. +/// ``` +/// # use std::os::unix::io::AsRawFd; +/// # use std::thread; +/// # use std::time; +/// # use nix::errno::Errno; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::SigevNotify; +/// # use tempfile::tempfile; +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], SigevNotify::SigevNone) +/// .unwrap(); +/// // We must wait for the completion of each individual operation +/// while (aiow.as_mut().error() == Err(Errno::EINPROGRESS)) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +/// +/// Use `lio_listio` to submit multiple operations, and receive notification +/// only when all of them are complete. This can be useful when there is some +/// logical relationship between the operations. But beware! Errors or system +/// resource limitations may cause `lio_listio` to return `EIO`, `EAGAIN`, or +/// `EINTR`, in which case some but not all operations may have been submitted. +/// In that case, you must check the status of each individual operation, and +/// possibly resubmit some. +/// ``` +/// # use libc::c_int; +/// # use std::os::unix::io::AsRawFd; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use std::thread; +/// # use std::time; +/// # use lazy_static::lazy_static; +/// # use nix::errno::Errno; +/// # use nix::sys::aio::*; +/// # use nix::sys::signal::*; +/// # use tempfile::tempfile; +/// lazy_static! { +/// pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); +/// } +/// +/// extern fn sigfunc(_: c_int) { +/// SIGNALED.store(true, Ordering::Relaxed); +/// } +/// let sa = SigAction::new(SigHandler::Handler(sigfunc), +/// SaFlags::SA_RESETHAND, +/// SigSet::empty()); +/// SIGNALED.store(false, Ordering::Relaxed); +/// unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); +/// +/// const WBUF: &[u8] = b"abcdef123456"; +/// let mut f = tempfile().unwrap(); +/// let mut aiow = Box::pin(AioWrite::new( +/// f.as_raw_fd(), +/// 2, // offset +/// WBUF, +/// 0, // priority +/// SigevNotify::SigevNone +/// )); +/// let sev = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, si_value: 0 }; +/// lio_listio(LioMode::LIO_NOWAIT, &mut[aiow.as_mut()], sev).unwrap(); +/// while !SIGNALED.load(Ordering::Relaxed) { +/// thread::sleep(time::Duration::from_millis(10)); +/// } +/// // At this point, since `lio_listio` returned success and delivered its +/// // notification, we know that all operations are complete. +/// assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); +/// ``` +pub fn lio_listio( + mode: LioMode, + list: &mut [Pin<&mut dyn AsMut>], + sigev_notify: SigevNotify, +) -> Result<()> { + let p = list as *mut [Pin<&mut dyn AsMut>] + as *mut [*mut libc::aiocb] + as *mut *mut libc::aiocb; + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, list.len() as i32, sigevp) + }) + .map(drop) +} + #[cfg(test)] mod t { use super::*; - // It's important that `LioCb` be `UnPin`. The tokio-file crate relies on - // it. + /// aio_suspend relies on casting Rust Aio* struct pointers to libc::aiocb + /// pointers. This test ensures that such casts are valid. #[test] - fn liocb_is_unpin() { - use assert_impl::assert_impl; + fn casting() { + let sev = SigevNotify::SigevNone; + let aiof = AioFsync::new(666, AioFsyncMode::O_SYNC, 0, sev); + assert_eq!( + aiof.as_ref() as *const libc::aiocb, + &aiof as *const AioFsync as *const libc::aiocb + ); + + let mut rbuf = []; + let aior = AioRead::new(666, 0, &mut rbuf, 0, sev); + assert_eq!( + aior.as_ref() as *const libc::aiocb, + &aior as *const AioRead as *const libc::aiocb + ); + + let wbuf = []; + let aiow = AioWrite::new(666, 0, &wbuf, 0, sev); + assert_eq!( + aiow.as_ref() as *const libc::aiocb, + &aiow as *const AioWrite as *const libc::aiocb + ); + } + + #[cfg(target_os = "freebsd")] + #[test] + fn casting_vectored() { + let sev = SigevNotify::SigevNone; + + let mut rbuf = []; + let mut rbufs = [IoSliceMut::new(&mut rbuf)]; + let aiorv = AioReadv::new(666, 0, &mut rbufs[..], 0, sev); + assert_eq!( + aiorv.as_ref() as *const libc::aiocb, + &aiorv as *const AioReadv as *const libc::aiocb + ); - assert_impl!(Unpin: LioCb); + let wbuf = []; + let wbufs = [IoSlice::new(&wbuf)]; + let aiowv = AioWritev::new(666, 0, &wbufs, 0, sev); + assert_eq!( + aiowv.as_ref() as *const libc::aiocb, + &aiowv as *const AioWritev as *const libc::aiocb + ); } } diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index 80cd053f8d..ca35b5f88c 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -1,415 +1,525 @@ -use libc::{c_int, c_void}; -use nix::Result; -use nix::errno::*; -use nix::sys::aio::*; -use nix::sys::signal::{SaFlags, SigAction, sigaction, SigevNotify, SigHandler, Signal, SigSet}; -use nix::sys::time::{TimeSpec, TimeValLike}; -use std::io::{Write, Read, Seek, SeekFrom}; -use std::ops::Deref; -use std::os::unix::io::AsRawFd; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::{thread, time}; +use std::{ + io::{Read, Seek, SeekFrom, Write}, + ops::Deref, + os::unix::io::AsRawFd, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + thread, + time, +}; + +use libc::c_int; +use nix::{ + errno::*, + sys::{ + aio::*, + signal::{ + sigaction, + SaFlags, + SigAction, + SigHandler, + SigSet, + SigevNotify, + Signal, + }, + time::{TimeSpec, TimeValLike}, + }, +}; use tempfile::tempfile; -// Helper that polls an AioCb for completion or error -fn poll_aio(aiocb: &mut Pin>) -> Result<()> { - loop { - let err = aiocb.error(); - if err != Err(Errno::EINPROGRESS) { return err; }; - thread::sleep(time::Duration::from_millis(10)); - } +lazy_static! { + pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); } -// Helper that polls a component of an LioCb for completion or error -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -fn poll_lio(liocb: &mut LioCb, i: usize) -> Result<()> { - loop { - let err = liocb.error(i); - if err != Err(Errno::EINPROGRESS) { return err; }; - thread::sleep(time::Duration::from_millis(10)); - } +extern "C" fn sigfunc(_: c_int) { + SIGNALED.store(true, Ordering::Relaxed); } -#[test] -fn test_accessors() { - let mut rbuf = vec![0; 4]; - let aiocb = AioCb::from_mut_slice( 1001, - 2, //offset - &mut rbuf, - 42, //priority - SigevNotify::SigevSignal { - signal: Signal::SIGUSR2, - si_value: 99 - }, - LioOpcode::LIO_NOP); - assert_eq!(1001, aiocb.fd()); - assert_eq!(Some(LioOpcode::LIO_NOP), aiocb.lio_opcode()); - assert_eq!(4, aiocb.nbytes()); - assert_eq!(2, aiocb.offset()); - assert_eq!(42, aiocb.priority()); - let sev = aiocb.sigevent().sigevent(); - assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); - assert_eq!(99, sev.sigev_value.sival_ptr as i64); +// Helper that polls an AioCb for completion or error +macro_rules! poll_aio { + ($aiocb: expr) => { + loop { + let err = $aiocb.as_mut().error(); + if err != Err(Errno::EINPROGRESS) { + break err; + }; + thread::sleep(time::Duration::from_millis(10)); + } + }; } -// Tests AioCb.cancel. We aren't trying to test the OS's implementation, only -// our bindings. So it's sufficient to check that AioCb.cancel returned any -// AioCancelStat value. -#[test] -#[cfg_attr(target_env = "musl", ignore)] -fn test_cancel() { - let wbuf: &[u8] = b"CDEF"; - - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 0, //offset - wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - let err = aiocb.error(); - assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); +mod aio_fsync { + use super::*; + + #[test] + fn test_accessors() { + let aiocb = AioFsync::new( + 1001, + AioFsyncMode::O_SYNC, + 42, + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let cancelstat = aiocb.cancel(); - assert!(cancelstat.is_ok()); + /// `AioFsync::submit` should not modify the `AioCb` object if + /// `libc::aio_fsync` returns an error + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + use std::mem; + + const INITIAL: &[u8] = b"abcdef123456"; + // Create an invalid AioFsyncMode + let mode = unsafe { mem::transmute(666) }; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiof = Box::pin(AioFsync::new( + f.as_raw_fd(), + mode, + 0, + SigevNotify::SigevNone, + )); + let err = aiof.as_mut().submit(); + assert!(err.is_err()); + } - // Wait for aiocb to complete, but don't care whether it succeeded - let _ = poll_aio(&mut aiocb); - let _ = aiocb.aio_return(); + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let fd = f.as_raw_fd(); + let mut aiof = Box::pin(AioFsync::new( + fd, + AioFsyncMode::O_SYNC, + 0, + SigevNotify::SigevNone, + )); + let err = aiof.as_mut().submit(); + assert!(err.is_ok()); + poll_aio!(&mut aiof).unwrap(); + aiof.as_mut().aio_return().unwrap(); + } } -// Tests using aio_cancel_all for all outstanding IOs. -#[test] -#[cfg_attr(target_env = "musl", ignore)] -fn test_aio_cancel_all() { - let wbuf: &[u8] = b"CDEF"; - - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice(f.as_raw_fd(), - 0, //offset - wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - let err = aiocb.error(); - assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); +mod aio_read { + use super::*; + + #[test] + fn test_accessors() { + let mut rbuf = vec![0; 4]; + let aiocb = AioRead::new( + 1001, + 2, //offset + &mut rbuf, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(4, aiocb.nbytes()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let cancelstat = aio_cancel_all(f.as_raw_fd()); - assert!(cancelstat.is_ok()); + // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, + // only our bindings. So it's sufficient to check that cancel + // returned any AioCancelStat value. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn cancel() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let fd = f.as_raw_fd(); + let mut aior = + Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone)); + aior.as_mut().submit().unwrap(); + + let cancelstat = aior.as_mut().cancel(); + assert!(cancelstat.is_ok()); + + // Wait for aiow to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aior); + let _ = aior.as_mut().aio_return(); + } - // Wait for aiocb to complete, but don't care whether it succeeded - let _ = poll_aio(&mut aiocb); - let _ = aiocb.aio_return(); -} + /// `AioRead::submit` should not modify the `AioCb` object if + /// `libc::aio_read` returns an error + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aior = Box::pin(AioRead::new( + f.as_raw_fd(), + -1, //an invalid offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + )); + assert!(aior.as_mut().submit().is_err()); + } -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_fsync() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_fd( f.as_raw_fd(), - 0, //priority - SigevNotify::SigevNone); - let err = aiocb.fsync(AioFsyncMode::O_SYNC); - assert!(err.is_ok()); - poll_aio(&mut aiocb).unwrap(); - aiocb.aio_return().unwrap(); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + const EXPECT: &[u8] = b"cdef"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + { + let fd = f.as_raw_fd(); + let mut aior = Box::pin(AioRead::new( + fd, + 2, + &mut rbuf, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut().submit().unwrap(); -/// `AioCb::fsync` should not modify the `AioCb` object if `libc::aio_fsync` returns -/// an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_fsync_error() { - use std::mem; + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); + } + assert_eq!(EXPECT, rbuf.deref().deref()); + } - const INITIAL: &[u8] = b"abcdef123456"; - // Create an invalid AioFsyncMode - let mode = unsafe { mem::transmute(666) }; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_fd( f.as_raw_fd(), - 0, //priority - SigevNotify::SigevNone); - let err = aiocb.fsync(mode); - assert!(err.is_err()); + // Like ok, but allocates the structure on the stack. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn on_stack() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf = vec![0; 4]; + const EXPECT: &[u8] = b"cdef"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + { + let fd = f.as_raw_fd(); + let mut aior = + AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone); + let mut aior = unsafe { Pin::new_unchecked(&mut aior) }; + aior.as_mut().submit().unwrap(); + + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len()); + } + assert_eq!(EXPECT, rbuf.deref().deref()); + } } -#[test] -// On Cirrus on Linux, this test fails due to a glibc bug. -// https://github.com/nix-rust/nix/issues/1099 -#[cfg_attr(target_os = "linux", ignore)] -// On Cirrus, aio_suspend is failing with EINVAL -// https://github.com/nix-rust/nix/issues/1361 -#[cfg_attr(target_os = "macos", ignore)] -fn test_aio_suspend() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEFG"; - let timeout = TimeSpec::seconds(10); - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); +#[cfg(target_os = "freebsd")] +#[cfg(fbsd14)] +mod aio_readv { + use std::io::IoSliceMut; + + use super::*; + + #[test] + fn test_accessors() { + let mut rbuf0 = vec![0; 4]; + let mut rbuf1 = vec![0; 8]; + let mut rbufs = + [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; + let aiocb = AioReadv::new( + 1001, + 2, //offset + &mut rbufs, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(2, aiocb.iovlen()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } - let mut wcb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE); - - let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ); - wcb.write().unwrap(); - rcb.read().unwrap(); - loop { + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let mut rbuf0 = vec![0; 4]; + let mut rbuf1 = vec![0; 2]; + let mut rbufs = + [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)]; + const EXPECT0: &[u8] = b"cdef"; + const EXPECT1: &[u8] = b"12"; + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); { - let cbbuf = [wcb.as_ref(), rcb.as_ref()]; - let r = aio_suspend(&cbbuf[..], Some(timeout)); - match r { - Err(Errno::EINTR) => continue, - Err(e) => panic!("aio_suspend returned {:?}", e), - Ok(_) => () - }; - } - if rcb.error() != Err(Errno::EINPROGRESS) && - wcb.error() != Err(Errno::EINPROGRESS) { - break + let fd = f.as_raw_fd(); + let mut aior = Box::pin(AioReadv::new( + fd, + 2, + &mut rbufs, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut().submit().unwrap(); + + let err = poll_aio!(&mut aior); + assert_eq!(err, Ok(())); + assert_eq!( + aior.as_mut().aio_return().unwrap(), + EXPECT0.len() + EXPECT1.len() + ); } + assert_eq!(&EXPECT0, &rbuf0); + assert_eq!(&EXPECT1, &rbuf1); } - - assert_eq!(wcb.aio_return().unwrap() as usize, WBUF.len()); - assert_eq!(rcb.aio_return().unwrap() as usize, rlen); } -// Test a simple aio operation with no completion notification. We must poll -// for completion -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - 2, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); +mod aio_write { + use super::*; + + #[test] + fn test_accessors() { + let wbuf = vec![0; 4]; + let aiocb = AioWrite::new( + 1001, + 2, //offset + &wbuf, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(4, aiocb.nbytes()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); } - assert_eq!(EXPECT, rbuf.deref().deref()); -} + // Tests AioWrite.cancel. We aren't trying to test the OS's implementation, + // only our bindings. So it's sufficient to check that cancel + // returned any AioCancelStat value. + #[test] + #[cfg_attr(target_env = "musl", ignore)] + fn cancel() { + let wbuf: &[u8] = b"CDEF"; -/// `AioCb::read` should not modify the `AioCb` object if `libc::aio_read` -/// returns an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_read_error() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - -1, //an invalid offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - assert!(aiocb.read().is_err()); -} + let f = tempfile().unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 0, + wbuf, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); + let err = aiow.as_mut().error(); + assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); -// Tests from_mut_slice -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read_into_mut_slice() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(), - 2, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); + let cancelstat = aiow.as_mut().cancel(); + assert!(cancelstat.is_ok()); + + // Wait for aiow to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aiow); + let _ = aiow.as_mut().aio_return(); } - assert_eq!(rbuf, EXPECT); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf = "CDEF".to_string().into_bytes(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, + &wbuf, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); -// Tests from_ptr -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read_into_pointer() { - const INITIAL: &[u8] = b"abcdef123456"; - let mut rbuf = vec![0; 4]; - const EXPECT: &[u8] = b"cdef"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - { - // Safety: ok because rbuf lives until after poll_aio - let mut aiocb = unsafe { - AioCb::from_mut_ptr( f.as_raw_fd(), - 2, //offset - rbuf.as_mut_ptr() as *mut c_void, - rbuf.len(), - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP) - }; - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); + let err = poll_aio!(&mut aiow); assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); - } + assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); - assert_eq!(rbuf, EXPECT); -} - -// Test reading into an immutable buffer. It should fail -// FIXME: This test fails to panic on Linux/musl -#[test] -#[should_panic(expected = "Can't read into an immutable buffer")] -#[cfg_attr(target_env = "musl", ignore)] -fn test_read_immutable_buffer() { - let rbuf: &[u8] = b"CDEF"; - let f = tempfile().unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); -} + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } + // Like ok, but allocates the structure on the stack. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn on_stack() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf = "CDEF".to_string().into_bytes(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"abCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = AioWrite::new( + f.as_raw_fd(), + 2, //offset + &wbuf, + 0, //priority + SigevNotify::SigevNone, + ); + let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) }; + aiow.as_mut().submit().unwrap(); -// Test a simple aio operation with no completion notification. We must poll -// for completion. Unlike test_aio_read, this test uses AioCb::from_slice -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_write() { - const INITIAL: &[u8] = b"abcdef123456"; - let wbuf = "CDEF".to_string().into_bytes(); - let mut rbuf = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; + let err = poll_aio!(&mut aiow); + assert_eq!(err, Ok(())); + assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len()); - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - &wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf, EXPECT); + /// `AioWrite::write` should not modify the `AioCb` object if + /// `libc::aio_write` returns an error. + // Skip on Linux, because Linux's AIO implementation can't detect errors + // synchronously + #[test] + #[cfg(any(target_os = "freebsd", target_os = "macos"))] + fn error() { + let wbuf = "CDEF".to_string().into_bytes(); + let mut aiow = Box::pin(AioWrite::new( + 666, // An invalid file descriptor + 0, //offset + &wbuf, + 0, //priority + SigevNotify::SigevNone, + )); + assert!(aiow.as_mut().submit().is_err()); + // Dropping the AioWrite at this point should not panic + } } -// Tests `AioCb::from_ptr` -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_write_from_pointer() { - const INITIAL: &[u8] = b"abcdef123456"; - let wbuf = "CDEF".to_string().into_bytes(); - let mut rbuf = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - // Safety: ok because aiocb outlives poll_aio - let mut aiocb = unsafe { - AioCb::from_ptr( f.as_raw_fd(), - 2, //offset - wbuf.as_ptr() as *const c_void, - wbuf.len(), - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP) - }; - aiocb.write().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf, EXPECT); -} +#[cfg(target_os = "freebsd")] +#[cfg(fbsd14)] +mod aio_writev { + use std::io::IoSlice; + + use super::*; + + #[test] + fn test_accessors() { + let wbuf0 = vec![0; 4]; + let wbuf1 = vec![0; 8]; + let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)]; + let aiocb = AioWritev::new( + 1001, + 2, //offset + &wbufs, + 42, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 99, + }, + ); + assert_eq!(1001, aiocb.fd()); + assert_eq!(2, aiocb.iovlen()); + assert_eq!(2, aiocb.offset()); + assert_eq!(42, aiocb.priority()); + let sev = aiocb.sigevent().sigevent(); + assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo); + assert_eq!(99, sev.sigev_value.sival_ptr as i64); + } -/// `AioCb::write` should not modify the `AioCb` object if `libc::aio_write` -/// returns an error -// Skip on Linux, because Linux's AIO implementation can't detect errors -// synchronously -#[test] -#[cfg(any(target_os = "freebsd", target_os = "macos"))] -fn test_write_error() { - let wbuf = "CDEF".to_string().into_bytes(); - let mut aiocb = AioCb::from_slice( 666, // An invalid file descriptor - 0, //offset - &wbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - assert!(aiocb.write().is_err()); -} + // Test a simple aio operation with no completion notification. We must + // poll for completion. + #[test] + #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] + fn ok() { + const INITIAL: &[u8] = b"abcdef123456"; + let wbuf0 = b"BC"; + let wbuf1 = b"DEF"; + let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)]; + let wlen = wbuf0.len() + wbuf1.len(); + let mut rbuf = Vec::new(); + const EXPECT: &[u8] = b"aBCDEF123456"; + + let mut f = tempfile().unwrap(); + f.write_all(INITIAL).unwrap(); + let mut aiow = Box::pin(AioWritev::new( + f.as_raw_fd(), + 1, + &wbufs, + 0, + SigevNotify::SigevNone, + )); + aiow.as_mut().submit().unwrap(); -lazy_static! { - pub static ref SIGNALED: AtomicBool = AtomicBool::new(false); -} + let err = poll_aio!(&mut aiow); + assert_eq!(err, Ok(())); + assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen); -extern fn sigfunc(_: c_int) { - SIGNALED.store(true, Ordering::Relaxed); + f.seek(SeekFrom::Start(0)).unwrap(); + let len = f.read_to_end(&mut rbuf).unwrap(); + assert_eq!(len, EXPECT.len()); + assert_eq!(rbuf, EXPECT); + } } // Test an aio operation with completion delivered by a signal -// FIXME: This test is ignored on mips because of failures in qemu in CI #[test] -#[cfg_attr(any(all(target_env = "musl", target_arch = "x86_64"), target_arch = "mips", target_arch = "mips64"), ignore)] -fn test_write_sigev_signal() { +#[cfg_attr( + any( + all(target_env = "musl", target_arch = "x86_64"), + target_arch = "mips", + target_arch = "mips64" + ), + ignore +)] +fn sigev_signal() { let _m = crate::SIGNAL_MTX.lock(); - let sa = SigAction::new(SigHandler::Handler(sigfunc), - SaFlags::SA_RESETHAND, - SigSet::empty()); + let sa = SigAction::new( + SigHandler::Handler(sigfunc), + SaFlags::SA_RESETHAND, + SigSet::empty(), + ); SIGNALED.store(false, Ordering::Relaxed); unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); @@ -420,201 +530,107 @@ fn test_write_sigev_signal() { let mut f = tempfile().unwrap(); f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevSignal { - signal: Signal::SIGUSR2, - si_value: 0 //TODO: validate in sigfunc - }, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); + let mut aiow = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevSignal { + signal: Signal::SIGUSR2, + si_value: 0, //TODO: validate in sigfunc + }, + )); + aiow.as_mut().submit().unwrap(); while !SIGNALED.load(Ordering::Relaxed) { thread::sleep(time::Duration::from_millis(10)); } - assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); + assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len()); f.seek(SeekFrom::Start(0)).unwrap(); let len = f.read_to_end(&mut rbuf).unwrap(); assert_eq!(len, EXPECT.len()); assert_eq!(rbuf, EXPECT); } -// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the -// time listio returns. -#[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_liocb_listio_wait() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - let mut f = tempfile().unwrap(); - - f.write_all(INITIAL).unwrap(); - - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); - err.expect("lio_listio"); - - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); - } - assert_eq!(rbuf.deref().deref(), b"3456"); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); -} - -// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other -// mechanism to check for the individual AioCb's completion. +// Tests using aio_cancel_all for all outstanding IOs. #[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_liocb_listio_nowait() { - const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; - let mut rbuf = vec![0; 4]; - let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; - let mut f = tempfile().unwrap(); +#[cfg_attr(target_env = "musl", ignore)] +fn test_aio_cancel_all() { + let wbuf: &[u8] = b"CDEF"; - f.write_all(INITIAL).unwrap(); + let f = tempfile().unwrap(); + let mut aiocb = Box::pin(AioWrite::new( + f.as_raw_fd(), + 0, //offset + wbuf, + 0, //priority + SigevNotify::SigevNone, + )); + aiocb.as_mut().submit().unwrap(); + let err = aiocb.as_mut().error(); + assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS)); - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); - err.expect("lio_listio"); - - poll_lio(&mut liocb, 0).unwrap(); - poll_lio(&mut liocb, 1).unwrap(); - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); - } - assert_eq!(rbuf.deref().deref(), b"3456"); + let cancelstat = aio_cancel_all(f.as_raw_fd()); + assert!(cancelstat.is_ok()); - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); + // Wait for aiocb to complete, but don't care whether it succeeded + let _ = poll_aio!(&mut aiocb); + let _ = aiocb.as_mut().aio_return(); } -// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all -// AioCb's are complete. -// FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI. #[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)] -fn test_liocb_listio_signal() { - let _m = crate::SIGNAL_MTX.lock(); +// On Cirrus on Linux, this test fails due to a glibc bug. +// https://github.com/nix-rust/nix/issues/1099 +#[cfg_attr(target_os = "linux", ignore)] +// On Cirrus, aio_suspend is failing with EINVAL +// https://github.com/nix-rust/nix/issues/1361 +#[cfg_attr(target_os = "macos", ignore)] +fn test_aio_suspend() { const INITIAL: &[u8] = b"abcdef123456"; - const WBUF: &[u8] = b"CDEF"; + const WBUF: &[u8] = b"CDEFG"; + let timeout = TimeSpec::seconds(10); let mut rbuf = vec![0; 4]; let rlen = rbuf.len(); - let mut rbuf2 = Vec::new(); - const EXPECT: &[u8] = b"abCDEF123456"; let mut f = tempfile().unwrap(); - let sa = SigAction::new(SigHandler::Handler(sigfunc), - SaFlags::SA_RESETHAND, - SigSet::empty()); - let sigev_notify = SigevNotify::SigevSignal { signal: Signal::SIGUSR2, - si_value: 0 }; - f.write_all(INITIAL).unwrap(); - { - let mut liocb = LioCbBuilder::with_capacity(2) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE - ).emplace_mut_slice( - f.as_raw_fd(), - 8, //offset - &mut rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - SIGNALED.store(false, Ordering::Relaxed); - unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); - let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify); - err.expect("lio_listio"); - while !SIGNALED.load(Ordering::Relaxed) { - thread::sleep(time::Duration::from_millis(10)); + let mut wcb = Box::pin(AioWrite::new( + f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevNone, + )); + + let mut rcb = Box::pin(AioRead::new( + f.as_raw_fd(), + 8, //offset + &mut rbuf, + 0, //priority + SigevNotify::SigevNone, + )); + wcb.as_mut().submit().unwrap(); + rcb.as_mut().submit().unwrap(); + loop { + { + let cbbuf = [ + &*wcb as &dyn AsRef, + &*rcb as &dyn AsRef, + ]; + let r = aio_suspend(&cbbuf[..], Some(timeout)); + match r { + Err(Errno::EINTR) => continue, + Err(e) => panic!("aio_suspend returned {:?}", e), + Ok(_) => (), + }; + } + if rcb.as_mut().error() != Err(Errno::EINPROGRESS) && + wcb.as_mut().error() != Err(Errno::EINPROGRESS) + { + break; } - - assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); - assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen); } - assert_eq!(rbuf.deref().deref(), b"3456"); - - f.seek(SeekFrom::Start(0)).unwrap(); - let len = f.read_to_end(&mut rbuf2).unwrap(); - assert_eq!(len, EXPECT.len()); - assert_eq!(rbuf2, EXPECT); -} -// Try to use LioCb::listio to read into an immutable buffer. It should fail -// FIXME: This test fails to panic on Linux/musl -#[test] -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -#[should_panic(expected = "Can't read into an immutable buffer")] -#[cfg_attr(target_env = "musl", ignore)] -fn test_liocb_listio_read_immutable() { - let rbuf: &[u8] = b"abcd"; - let f = tempfile().unwrap(); - - - let mut liocb = LioCbBuilder::with_capacity(1) - .emplace_slice( - f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ - ).finish(); - let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); + assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len()); + assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen); } diff --git a/test/sys/test_aio_drop.rs b/test/sys/test_aio_drop.rs index f9ff97af6c..0836a5422d 100644 --- a/test/sys/test_aio_drop.rs +++ b/test/sys/test_aio_drop.rs @@ -20,11 +20,10 @@ fn test_drop() { let f = tempfile().unwrap(); f.set_len(6).unwrap(); - let mut aiocb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - WBUF, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.write().unwrap(); + let mut aiocb = Box::pin(AioWrite::new(f.as_raw_fd(), + 2, //offset + WBUF, + 0, //priority + SigevNotify::SigevNone)); + aiocb.as_mut().submit().unwrap(); } diff --git a/test/sys/test_lio_listio_resubmit.rs b/test/sys/test_lio_listio_resubmit.rs deleted file mode 100644 index 2ed058c27c..0000000000 --- a/test/sys/test_lio_listio_resubmit.rs +++ /dev/null @@ -1,106 +0,0 @@ -// vim: tw=80 - -// Annoyingly, Cargo is unable to conditionally build an entire test binary. So -// we must disable the test here rather than in Cargo.toml -#![cfg(target_os = "freebsd")] - -use nix::errno::*; -use nix::libc::off_t; -use nix::sys::aio::*; -use nix::sys::signal::SigevNotify; -use nix::unistd::{SysconfVar, sysconf}; -use std::os::unix::io::AsRawFd; -use std::{thread, time}; -use sysctl::{CtlValue, Sysctl}; -use tempfile::tempfile; - -const BYTES_PER_OP: usize = 512; - -/// Attempt to collect final status for all of `liocb`'s operations, freeing -/// system resources -fn finish_liocb(liocb: &mut LioCb) { - for j in 0..liocb.len() { - loop { - let e = liocb.error(j); - match e { - Ok(()) => break, - Err(Errno::EINPROGRESS) => - thread::sleep(time::Duration::from_millis(10)), - Err(x) => panic!("aio_error({:?})", x) - } - } - assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize); - } -} - -// Deliberately exceed system resource limits, causing lio_listio to return EIO. -// This test must run in its own process since it deliberately uses all AIO -// resources. ATM it is only enabled on FreeBSD, because I don't know how to -// check system AIO limits on other operating systems. -#[test] -fn test_lio_listio_resubmit() { - let mut resubmit_count = 0; - - // Lookup system resource limits - let alm = sysconf(SysconfVar::AIO_LISTIO_MAX) - .expect("sysconf").unwrap() as usize; - let ctl = sysctl::Ctl::new("vfs.aio.max_aio_queue_per_proc").unwrap(); - let maqpp = if let CtlValue::Int(x) = ctl.value().unwrap() { - x as usize - } else { - panic!("unknown sysctl"); - }; - - // Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also - // result in a final lio_listio call that can only partially be queued - let target_ops = maqpp + alm / 2; - let num_listios = (target_ops + alm - 3) / (alm - 2); - let ops_per_listio = (target_ops + num_listios - 1) / num_listios; - assert!((num_listios - 1) * ops_per_listio < maqpp, - "the last lio_listio won't make any progress; fix the algorithm"); - println!("Using {:?} LioCbs of {:?} operations apiece", num_listios, - ops_per_listio); - - let f = tempfile().unwrap(); - let buffer_set = (0..num_listios).map(|_| { - (0..ops_per_listio).map(|_| { - vec![0u8; BYTES_PER_OP] - }).collect::>() - }).collect::>(); - - let mut liocbs = (0..num_listios).map(|i| { - let mut builder = LioCbBuilder::with_capacity(ops_per_listio); - for j in 0..ops_per_listio { - let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t; - builder = builder.emplace_slice(f.as_raw_fd(), - offset, - &buffer_set[i][j][..], - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_WRITE); - } - let mut liocb = builder.finish(); - let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); - while err == Err(Errno::EIO) || - err == Err(Errno::EAGAIN) || - err == Err(Errno::EINTR) { - // - thread::sleep(time::Duration::from_millis(10)); - resubmit_count += 1; - err = liocb.listio_resubmit(LioMode::LIO_NOWAIT, - SigevNotify::SigevNone); - } - liocb - }).collect::>(); - - // Ensure that every AioCb completed - for liocb in liocbs.iter_mut() { - finish_liocb(liocb); - } - - if resubmit_count > 0 { - println!("Resubmitted {:?} times, test passed", resubmit_count); - } else { - println!("Never resubmitted. Test ambiguous"); - } -}