diff --git a/crossbeam-epoch/Cargo.toml b/crossbeam-epoch/Cargo.toml index bdb095ae1..a1c48ae28 100644 --- a/crossbeam-epoch/Cargo.toml +++ b/crossbeam-epoch/Cargo.toml @@ -27,12 +27,14 @@ std = ["alloc", "crossbeam-utils/std", "lazy_static"] # NOTE: Disabling both `std` *and* `alloc` features is not supported yet. alloc = [] +# These features are no longer used. +# TODO: remove in the next major version. # Enable to use of unstable functionality. # This is disabled by default and requires recent nightly compiler. # # NOTE: This feature is outside of the normal semver guarantees and minor or # patch versions of crossbeam may make breaking changes to them at any time. -nightly = ["crossbeam-utils/nightly", "const_fn"] +nightly = ["crossbeam-utils/nightly"] # Enable the use of loom for concurrency testing. # @@ -40,9 +42,11 @@ nightly = ["crossbeam-utils/nightly", "const_fn"] # patch versions of crossbeam may make breaking changes to them at any time. loom = ["loom-crate", "crossbeam-utils/loom"] +[build-dependencies] +autocfg = "1" + [dependencies] cfg-if = "1" -const_fn = { version = "0.4.4", optional = true } memoffset = "0.6" # Enable the use of loom for concurrency testing. @@ -67,3 +71,4 @@ default-features = false [dev-dependencies] rand = "0.8" +rustversion = "1" diff --git a/crossbeam-epoch/build.rs b/crossbeam-epoch/build.rs index 587e0580b..bba54d999 100644 --- a/crossbeam-epoch/build.rs +++ b/crossbeam-epoch/build.rs @@ -29,6 +29,18 @@ fn main() { } }; + let cfg = match autocfg::AutoCfg::new() { + Ok(cfg) => cfg, + Err(e) => { + println!( + "cargo:warning={}: unable to determine rustc version: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + // Note that this is `no_*`, not `has_*`. This allows treating // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't // run. This is needed for compatibility with non-cargo build systems that @@ -37,5 +49,10 @@ fn main() { println!("cargo:rustc-cfg=crossbeam_no_atomic_cas"); } + if cfg.probe_rustc_version(1, 61) { + // TODO: invert cfg once Rust 1.61 became stable. + println!("cargo:rustc-cfg=crossbeam_const_fn_trait_bound"); + } + println!("cargo:rerun-if-changed=no_atomic.rs"); } diff --git a/crossbeam-epoch/src/atomic.rs b/crossbeam-epoch/src/atomic.rs index f727387e6..b0b6a68e7 100644 --- a/crossbeam-epoch/src/atomic.rs +++ b/crossbeam-epoch/src/atomic.rs @@ -342,8 +342,16 @@ impl Atomic { /// /// let a = Atomic::::null(); /// ``` - /// - #[cfg_attr(all(feature = "nightly", not(crossbeam_loom)), const_fn::const_fn)] + #[cfg(all(crossbeam_const_fn_trait_bound, not(crossbeam_loom)))] + pub const fn null() -> Atomic { + Self { + data: AtomicUsize::new(0), + _marker: PhantomData, + } + } + + /// Returns a new null atomic pointer. + #[cfg(not(all(crossbeam_const_fn_trait_bound, not(crossbeam_loom))))] pub fn null() -> Atomic { Self { data: AtomicUsize::new(0), @@ -1594,7 +1602,7 @@ mod tests { Shared::::null().with_tag(7); } - #[cfg(feature = "nightly")] + #[rustversion::since(1.61)] #[test] fn const_atomic_null() { use super::Atomic; diff --git a/crossbeam-epoch/src/lib.rs b/crossbeam-epoch/src/lib.rs index 2791260d5..7150ec6f7 100644 --- a/crossbeam-epoch/src/lib.rs +++ b/crossbeam-epoch/src/lib.rs @@ -62,7 +62,6 @@ unreachable_pub )] #![cfg_attr(not(feature = "std"), no_std)] -#![cfg_attr(feature = "nightly", feature(const_fn_trait_bound))] #[cfg(crossbeam_loom)] extern crate loom_crate as loom; diff --git a/crossbeam-queue/src/array_queue.rs b/crossbeam-queue/src/array_queue.rs index 5f3061b70..c34f589e7 100644 --- a/crossbeam-queue/src/array_queue.rs +++ b/crossbeam-queue/src/array_queue.rs @@ -27,9 +27,11 @@ struct Slot { /// /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an -/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit -/// faster than [`SegQueue`]. +/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for +/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue +/// a bit faster than [`SegQueue`]. /// +/// [`force_push`]: ArrayQueue::force_push /// [`SegQueue`]: super::SegQueue /// /// # Examples @@ -120,21 +122,10 @@ impl ArrayQueue { } } - /// Attempts to push an element into the queue. - /// - /// If the queue is full, the element is returned back as an error. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// - /// assert_eq!(q.push(10), Ok(())); - /// assert_eq!(q.push(20), Err(20)); - /// ``` - pub fn push(&self, value: T) -> Result<(), T> { + fn push_or_else(&self, mut value: T, f: F) -> Result<(), T> + where + F: Fn(T, usize, usize, &Slot) -> Result, + { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -143,6 +134,16 @@ impl ArrayQueue { let index = tail & (self.one_lap - 1); let lap = tail & !(self.one_lap - 1); + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + // Inspect the corresponding slot. debug_assert!(index < self.buffer.len()); let slot = unsafe { self.buffer.get_unchecked(index) }; @@ -150,16 +151,6 @@ impl ArrayQueue { // If the tail and the stamp match, we may attempt to push. if tail == stamp { - let new_tail = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - // Try moving the tail. match self.tail.compare_exchange_weak( tail, @@ -182,14 +173,7 @@ impl ArrayQueue { } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { atomic::fence(Ordering::SeqCst); - let head = self.head.load(Ordering::Relaxed); - - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the queue is full. - return Err(value); - } - + value = f(value, tail, new_tail, slot)?; backoff.spin(); tail = self.tail.load(Ordering::Relaxed); } else { @@ -200,6 +184,79 @@ impl ArrayQueue { } } + /// Attempts to push an element into the queue. + /// + /// If the queue is full, the element is returned back as an error. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(1); + /// + /// assert_eq!(q.push(10), Ok(())); + /// assert_eq!(q.push(20), Err(20)); + /// ``` + pub fn push(&self, value: T) -> Result<(), T> { + self.push_or_else(value, |v, tail, _, _| { + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the queue is full. + Err(v) + } else { + Ok(v) + } + }) + } + + /// Pushes an element into the queue, replacing the oldest element if necessary. + /// + /// If the queue is full, the oldest element is replaced and returned, + /// otherwise `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(2); + /// + /// assert_eq!(q.force_push(10), None); + /// assert_eq!(q.force_push(20), None); + /// assert_eq!(q.force_push(30), Some(10)); + /// assert_eq!(q.pop(), Some(20)); + /// ``` + pub fn force_push(&self, value: T) -> Option { + self.push_or_else(value, |v, tail, new_tail, slot| { + let head = tail.wrapping_sub(self.one_lap); + let new_head = new_tail.wrapping_sub(self.one_lap); + + // Try moving the head. + if self + .head + .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // Move the tail. + self.tail.store(new_tail, Ordering::SeqCst); + + // Swap the previous value. + let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() }; + + // Update the stamp. + slot.stamp.store(tail + 1, Ordering::Release); + + Err(old) + } else { + Ok(v) + } + }) + .err() + } + /// Attempts to pop an element from the queue. /// /// If the queue is empty, `None` is returned. diff --git a/crossbeam-queue/tests/array_queue.rs b/crossbeam-queue/tests/array_queue.rs index a23e08201..9a64eac03 100644 --- a/crossbeam-queue/tests/array_queue.rs +++ b/crossbeam-queue/tests/array_queue.rs @@ -144,6 +144,45 @@ fn spsc() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn spsc_ring_buffer() { + const COUNT: usize = 100_000; + + let t = AtomicUsize::new(1); + let q = ArrayQueue::::new(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + + scope(|scope| { + scope.spawn(|_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Some(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if let Some(n) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }); + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), 1); + } +} + #[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn mpmc() { @@ -181,6 +220,50 @@ fn mpmc() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn mpmc_ring_buffer() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let t = AtomicUsize::new(THREADS); + let q = ArrayQueue::::new(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Some(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }); + } + + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + if let Some(n) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + #[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { @@ -244,13 +327,21 @@ fn linearizable() { let q = ArrayQueue::new(THREADS); scope(|scope| { - for _ in 0..THREADS { + for _ in 0..THREADS / 2 { scope.spawn(|_| { for _ in 0..COUNT { while q.push(0).is_err() {} q.pop().unwrap(); } }); + + scope.spawn(|_| { + for _ in 0..COUNT { + if q.force_push(0).is_none() { + q.pop().unwrap(); + } + } + }); } }) .unwrap();