From 8c0da9a686765e314ed7298638e2e555ad000ea9 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 12 Feb 2023 01:35:13 +0000 Subject: [PATCH] feat: Futex --- CHANGELOG.md | 2 + Cargo.toml | 3 +- src/sys/futex.rs | 382 +++++++++++++++++++++++++++++++++++++++++++++++ src/sys/mod.rs | 6 + 4 files changed, 392 insertions(+), 1 deletion(-) create mode 100644 src/sys/futex.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e8bde843f1..5222afbcf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](https://semver.org/). ### Added - Added `AT_EACCESS` to `AtFlags` on all platforms but android ([#1995](https://github.com/nix-rust/nix/pull/1995)) +- Added futex interface. + ([#1907](https://github.com/nix-rust/nix/pull/1907)) - Add `PF_ROUTE` to `SockType` on macOS, iOS, all of the BSDs, Fuchsia, Haiku, Illumos. ([#1867](https://github.com/nix-rust/nix/pull/1867)) - Added `nix::ucontext` module on `aarch64-unknown-linux-gnu`. diff --git a/Cargo.toml b/Cargo.toml index 960d4f61b3..e986941bc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ default = [ "hostname", "inotify", "ioctl", "kmod", "mman", "mount", "mqueue", "net", "personality", "poll", "process", "pthread", "ptrace", "quota", "reboot", "resource", "sched", "signal", "socket", "term", "time", - "ucontext", "uio", "user", "zerocopy", + "ucontext", "uio", "user", "zerocopy", "futex" ] acct = [] @@ -77,6 +77,7 @@ ucontext = ["signal"] uio = [] user = ["feature"] zerocopy = ["fs", "uio"] +futex = [] [dev-dependencies] assert-impl = "0.1" diff --git a/src/sys/futex.rs b/src/sys/futex.rs new file mode 100644 index 0000000000..5bf83a69ee --- /dev/null +++ b/src/sys/futex.rs @@ -0,0 +1,382 @@ +//! Fast user-space locking. + +use crate::{Errno, Result}; +use libc::{syscall, SYS_futex}; +use std::cell::UnsafeCell; +use std::convert::TryFrom; +use std::os::unix::io::{FromRawFd, OwnedFd}; +use std::time::Duration; + +fn timespec(duration: Duration) -> libc::timespec { + let tv_sec = duration.as_secs().try_into().unwrap(); + let tv_nsec = duration.subsec_nanos().try_into().unwrap(); + libc::timespec { tv_sec, tv_nsec } +} + +fn unwrap_or_null(option: Option<&T>) -> *const T { + match option { + Some(t) => t, + None => std::ptr::null(), + } +} + +/// Fast user-space locking. +/// +/// By default we presume the futex is not process-private, that is, it is used across processes. If +/// you know it is process-private you can set `PRIVATE` to `true` which allows some additional +/// optimizations. +/// ``` +/// # use nix::{ +/// # sys::{futex::Futex, mman::{mmap, MapFlags, ProtFlags}}, +/// # errno::Errno, +/// # unistd::{fork,ForkResult}, +/// # }; +/// # use std::{ +/// # time::{Instant, Duration}, +/// # num::NonZeroUsize, +/// # mem::{ManuallyDrop, size_of}, +/// # os::unix::io::OwnedFd, +/// # sync::Arc, +/// # thread::{spawn, sleep}, +/// # }; +/// const TIMEOUT: Duration = Duration::from_millis(500); +/// const DELTA: Duration = Duration::from_millis(100); +/// # fn main() -> nix::Result<()> { +/// let futex: Futex = Futex::new(0); +/// +/// // If the value of the futex is 0, wait for wake. Since the value is 0 and no wake occurs, +/// // we expect the timeout will pass. +/// +/// let instant = Instant::now(); +/// assert_eq!(futex.wait(0, Some(TIMEOUT)),Err(Errno::ETIMEDOUT)); +/// assert!(instant.elapsed() > TIMEOUT); +/// +/// // If the value of the futex is 1, wait for wake. Since the value is 0, not 1, this will +/// // return immediately. +/// +/// let instant = Instant::now(); +/// assert_eq!(futex.wait(1, Some(TIMEOUT)),Err(Errno::EAGAIN)); +/// assert!(instant.elapsed() < DELTA); +/// +/// // Test across threads +/// // ------------------------------------------------------------------------- +/// +/// let futex = Arc::new(futex); +/// let futex_clone = futex.clone(); +/// let instant = Instant::now(); +/// spawn(move || { +/// sleep(TIMEOUT); +/// assert_eq!(futex_clone.wake(1),Ok(1)); +/// }); +/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)), Ok(())); +/// assert!(instant.elapsed() > TIMEOUT && instant.elapsed() < TIMEOUT + DELTA); +/// +/// // Test across processes +/// // ------------------------------------------------------------------------- +/// +/// let shared_memory = unsafe { mmap::( +/// None, +/// NonZeroUsize::new_unchecked(size_of::>()), +/// ProtFlags::PROT_WRITE | ProtFlags::PROT_READ, +/// MapFlags::MAP_SHARED | MapFlags::MAP_ANONYMOUS, +/// None, +/// 0 +/// )? }; +/// let futex_ptr = shared_memory.cast::>(); +/// let futex = unsafe { &*futex_ptr }; +/// match unsafe { fork()? } { +/// ForkResult::Parent { child } => { +/// sleep(TIMEOUT); +/// assert_eq!(futex.wake(1),Ok(1)); +/// // Wait for child process to exit +/// unsafe { +/// assert_eq!(libc::waitpid(child.as_raw(), std::ptr::null_mut(), 0), child.as_raw()); +/// } +/// }, +/// ForkResult::Child => { +/// let now = Instant::now(); +/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)),Ok(())); +/// assert!(now.elapsed() > TIMEOUT && now.elapsed() < TIMEOUT + DELTA); +/// } +/// } +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug)] +pub struct Futex(pub UnsafeCell); + +impl Futex { + const MASK: i32 = if PRIVATE { libc::FUTEX_PRIVATE_FLAG } else { 0 }; + + /// Constructs new futex with a given `val`. + pub fn new(val: u32) -> Self { + Self(UnsafeCell::new(val)) + } + + /// If the value of the futex: + /// - `== val`, the thread sleeps waiting for a [`Futex::wake`] call, in this case this thread + /// is considered a waiter on this futex. + /// - `!= val`, then `Err` with [`Errno::EAGAIN`] is immediately returned. + /// + /// If the timeout is: + /// - `Some(_)` it specifies a timeout for the wait. + /// - `None` it will block indefinitely. + /// + /// Wraps [`libc::FUTEX_WAIT`]. + pub fn wait(&self, val: u32, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_WAIT, + val, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + + /// Wakes at most `val` waiters. + /// + /// - `val == 1` wakes a single waiter. + /// - `val == u32::MAX` wakes all waiters. + /// + /// No guarantee is provided about which waiters are awoken. A waiter with a higher scheduling + /// priority is not guaranteed to be awoken in preference to a waiter with a lower priority. + /// + /// Wraps [`libc::FUTEX_WAKE`]. + pub fn wake(&self, val: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Creates a file descriptor associated with the futex. + /// + /// When [`Futex::wake`] is performed on the futex this file indicates being readable with + /// `select`, `poll` and `epoll`. + /// + /// The file descriptor can be used to obtain asynchronous notifications: if val is nonzero, + /// then, when another process or thread executes a FUTEX_WAKE, the caller will receive the + /// signal number that was passed in val. + /// + /// **Because it was inherently racy, this is unsupported from Linux 2.6.26 onward.** + /// + /// Wraps [`libc::FUTEX_FD`]. + pub fn fd(&self, val: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val) + }; + + // On a 32 bit arch `x` will be an `i32` and will trigger this lint. + #[allow(clippy::useless_conversion)] + Errno::result(res) + .map(|x| unsafe { OwnedFd::from_raw_fd(i32::try_from(x).unwrap()) }) + } + + /// [`Futex::cmp_requeue`] without the check being made using `val3`. + /// + /// Wraps [`libc::FUTEX_REQUEUE`]. + pub fn requeue(&self, val: u32, val2: u32, uaddr2: &Self) -> Result { + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_CMP_REQUEUE, + val, + val2, + &uaddr2.0, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Wakes `val` waiters, moving remaining (up to `val2`) waiters to `uaddr2`. + /// + /// If the value of this futex `== val3` returns `Err` with [`Errno::EAGAIN`]. + /// + /// Typical values to specify for `val` are `0` or `1` (Specifying `u32::MAX` makes the + /// [`Futex::cmp_requeue`] equivalent to [`Futex::wake`]). + /// + /// Typical values to specify for `val2` are `1` or `u32::MAX` (Specifying `0` makes + /// [`Futex::cmp_requeue`] equivalent to [`Futex::wait`]). + /// + /// Wraps [`libc::FUTEX_CMP_REQUEUE`]. + pub fn cmp_requeue( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_CMP_REQUEUE, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Wraps [`libc::FUTEX_WAKE_OP`]. + pub fn wake_op( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_WAKE_OP, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Wraps [`libc::FUTEX_WAIT_BITSET`]. + pub fn wait_bitset( + &self, + val: u32, + timeout: Option, + val3: u32, + ) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_WAIT_BITSET, + val, + timespec_ptr, + val3, + ) + }; + Errno::result(res).map(drop) + } + + /// Wraps [`libc::FUTEX_WAKE_BITSET`]. + pub fn wake_bitset(&self, val: u32, val3: u32) -> Result { + let res = unsafe { + syscall(SYS_futex, self.0.get(), libc::FUTEX_WAKE_BITSET, val, val3) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Wraps [`libc::FUTEX_LOCK_PI`]. + pub fn lock_pi(&self, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_LOCK_PI, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + + /// Wraps [`libc::FUTEX_LOCK_PI2`]. + #[cfg(target_os = "linux")] + pub fn lock_pi2(&self, timeout: Option) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_LOCK_PI2, + timespec_ptr, + ) + }; + Errno::result(res).map(drop) + } + + /// Wraps [`libc::FUTEX_TRYLOCK_PI`]. + pub fn trylock_pi(&self) -> Result<()> { + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_TRYLOCK_PI, + ) + }; + Errno::result(res).map(drop) + } + + /// `libc::FUTEX_UNLOCK_PI` + pub fn unlock_pi(&self) -> Result<()> { + let res = unsafe { + syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_UNLOCK_PI) + }; + Errno::result(res).map(drop) + } + + /// Wraps [`libc::FUTEX_CMP_REQUEUE_PI`]. + pub fn cmp_requeue_pi( + &self, + val: u32, + val2: u32, + uaddr2: &Self, + val3: u32, + ) -> Result { + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_CMP_REQUEUE_PI, + val, + val2, + &uaddr2.0, + val3, + ) + }; + Errno::result(res).map(|x| u32::try_from(x).unwrap()) + } + + /// Wraps [`libc::FUTEX_WAIT_REQUEUE_PI`]. + pub fn wait_requeue_pi( + &self, + val: u32, + timeout: Option, + uaddr2: &Self, + ) -> Result<()> { + let timespec = timeout.map(timespec); + let timespec_ptr = unwrap_or_null(timespec.as_ref()); + + let res = unsafe { + syscall( + SYS_futex, + self.0.get(), + Self::MASK | libc::FUTEX_WAIT_REQUEUE_PI, + val, + timespec_ptr, + &uaddr2.0, + ) + }; + Errno::result(res).map(drop) + } +} + +unsafe impl Sync for Futex {} diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 383f08df04..419b65cc7f 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -225,3 +225,9 @@ feature! { #![feature = "time"] pub mod timer; } + +#[cfg(any(target_os = "android", target_os = "linux"))] +feature! { + #![feature = "futex"] + pub mod futex; +}