From cef73d2560613f5f4b8fe6cfc11ac73daf55001e Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 9 Sep 2022 09:53:01 -0700 Subject: [PATCH 1/8] implement initial loom --- Cargo.toml | 3 ++ src/bounded.rs | 89 ++++++++++++++++++++++++++------------------ src/lib.rs | 4 +- src/single.rs | 27 +++++++++----- src/sync.rs | 86 ++++++++++++++++++++++++++++++++++++++++++ src/unbounded.rs | 97 +++++++++++++++++++++++++++++++----------------- tests/loom.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 317 insertions(+), 81 deletions(-) create mode 100644 src/sync.rs create mode 100644 tests/loom.rs diff --git a/Cargo.toml b/Cargo.toml index 73abc7f..1b578e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ bench = false [dependencies] crossbeam-utils = { version = "0.8.11", default-features = false } +[target.'cfg(loom)'.dependencies] +loom = "0.5" + [[bench]] name = "bench" harness = false diff --git a/src/bounded.rs b/src/bounded.rs index 2ed2809..c075713 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -1,10 +1,12 @@ use alloc::{boxed::Box, vec::Vec}; -use core::cell::UnsafeCell; use core::mem::MaybeUninit; -use core::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; /// A slot in a queue. @@ -118,9 +120,9 @@ impl Bounded { ) { Ok(_) => { // Write the value into the slot and update the stamp. - unsafe { - slot.value.get().write(MaybeUninit::new(value)); - } + slot.value.with_mut(|slot| unsafe { + slot.write(MaybeUninit::new(value)); + }); slot.stamp.store(tail + 1, Ordering::Release); return Ok(()); } @@ -181,7 +183,9 @@ impl Bounded { ) { Ok(_) => { // Read the value from the slot and update the stamp. - let value = unsafe { slot.value.get().read().assume_init() }; + let value = slot + .value + .with_mut(|slot| unsafe { slot.read().assume_init() }); slot.stamp .store(head.wrapping_add(self.one_lap), Ordering::Release); return Ok(value); @@ -284,37 +288,48 @@ impl Bounded { impl Drop for Bounded { fn drop(&mut self) { // Get the index of the head. - let head = *self.head.get_mut(); - let tail = *self.tail.get_mut(); - - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); - - let len = if hix < tix { - tix - hix - } else if hix > tix { - self.buffer.len() - hix + tix - } else if (tail & !self.mark_bit) == head { - 0 - } else { - self.buffer.len() - }; - - // Loop over all slots that hold a value and drop them. - for i in 0..len { - // Compute the index of the next slot holding a value. - let index = if hix + i < self.buffer.len() { - hix + i - } else { - hix + i - self.buffer.len() - }; + let Self { + head, + tail, + buffer, + mark_bit, + .. + } = self; - // Drop the value in the slot. - let slot = &self.buffer[index]; - unsafe { - let value = &mut *slot.value.get(); - value.as_mut_ptr().drop_in_place(); - } - } + let mark_bit = *mark_bit; + + head.with_mut(|&mut head| { + tail.with_mut(|&mut tail| { + let hix = head & (mark_bit - 1); + let tix = tail & (mark_bit - 1); + + let len = if hix < tix { + tix - hix + } else if hix > tix { + buffer.len() - hix + tix + } else if (tail & !mark_bit) == head { + 0 + } else { + buffer.len() + }; + + // Loop over all slots that hold a value and drop them. + for i in 0..len { + // Compute the index of the next slot holding a value. + let index = if hix + i < buffer.len() { + hix + i + } else { + hix + i - buffer.len() + }; + + // Drop the value in the slot. + let slot = &buffer[index]; + slot.value.with_mut(|slot| unsafe { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); + } + }); + }); } } diff --git a/src/lib.rs b/src/lib.rs index 6531abe..8e7e2f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ extern crate std; use alloc::boxed::Box; use core::fmt; -use core::sync::atomic::{self, AtomicUsize, Ordering}; +use sync::atomic::{self, AtomicUsize, Ordering}; #[cfg(feature = "std")] use std::error; @@ -59,6 +59,8 @@ mod bounded; mod single; mod unbounded; +mod sync; + /// A concurrent queue. /// /// # Examples diff --git a/src/single.rs b/src/single.rs index 06ab766..c7748b3 100644 --- a/src/single.rs +++ b/src/single.rs @@ -1,7 +1,9 @@ -use core::cell::UnsafeCell; use core::mem::MaybeUninit; -use core::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; const LOCKED: usize = 1 << 0; @@ -33,7 +35,9 @@ impl Single { if state == 0 { // Write the value and unlock. - unsafe { self.slot.get().write(MaybeUninit::new(value)) } + self.slot.with_mut(|slot| unsafe { + slot.write(MaybeUninit::new(value)); + }); self.state.fetch_and(!LOCKED, Ordering::Release); Ok(()) } else if state & CLOSED != 0 { @@ -60,7 +64,9 @@ impl Single { if prev == state { // Read the value and unlock. - let value = unsafe { self.slot.get().read().assume_init() }; + let value = self + .slot + .with_mut(|slot| unsafe { slot.read().assume_init() }); self.state.fetch_and(!LOCKED, Ordering::Release); return Ok(value); } @@ -118,11 +124,14 @@ impl Single { impl Drop for Single { fn drop(&mut self) { // Drop the value in the slot. - if *self.state.get_mut() & PUSHED != 0 { - unsafe { - let value = &mut *self.slot.get(); - value.as_mut_ptr().drop_in_place(); + let Self { state, slot } = self; + state.with_mut(|state| { + if *state & PUSHED != 0 { + slot.with_mut(|slot| unsafe { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); } - } + }); } } diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..bf28e10 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,86 @@ +//! Synchronization facade to choose between `core` primitives and `loom` primitives. + +#[cfg(not(loom))] +mod sync_impl { + pub(crate) mod prelude { + pub(crate) use super::{AtomicExt, UnsafeCellExt}; + } + + pub(crate) use core::cell; + pub(crate) use core::sync::atomic; + + /// Emulate `loom::UnsafeCell`'s API. + pub(crate) trait UnsafeCellExt { + type Value; + + fn with(&self, f: F) -> R + where + F: FnOnce(*const Self::Value) -> R; + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Value) -> R; + } + + impl UnsafeCellExt for cell::UnsafeCell { + type Value = T; + + fn with(&self, f: F) -> R + where + F: FnOnce(*const Self::Value) -> R, + { + f(self.get()) + } + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Value) -> R, + { + f(self.get()) + } + } + + /// Emulate `loom::Atomic*`'s API. + pub(crate) trait AtomicExt { + type Value; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R; + } + + impl AtomicExt for atomic::AtomicUsize { + type Value = usize; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R, + { + f(self.get_mut()) + } + } + + impl AtomicExt for atomic::AtomicPtr { + type Value = *mut T; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Value) -> R, + { + f(self.get_mut()) + } + } +} + +#[cfg(loom)] +mod sync_impl { + pub(crate) mod prelude {} + pub(crate) use loom::cell; + + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + pub(crate) use loom::sync::atomic::*; + } +} + +pub(crate) use sync_impl::*; diff --git a/src/unbounded.rs b/src/unbounded.rs index 1815f5d..a7ed6fe 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -1,11 +1,13 @@ use alloc::boxed::Box; -use core::cell::UnsafeCell; use core::mem::MaybeUninit; use core::ptr; -use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use crossbeam_utils::CachePadded; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::sync::cell::UnsafeCell; +#[allow(unused_imports)] +use crate::sync::prelude::*; use crate::{busy_wait, PopError, PushError}; // Bits indicating the state of a slot: @@ -37,11 +39,25 @@ struct Slot { } impl Slot { + #[cfg(not(loom))] const UNINIT: Slot = Slot { value: UnsafeCell::new(MaybeUninit::uninit()), state: AtomicUsize::new(0), }; + #[cfg(not(loom))] + fn uninit_block() -> [Slot; BLOCK_CAP] { + [Self::UNINIT; BLOCK_CAP] + } + + #[cfg(loom)] + fn uninit_block() -> [Slot; BLOCK_CAP] { + // SAFETY: Behavior is well-defined when all items are zero: + // - MaybeUninit can have any bit pattern + // - AtomicUsize is transparent to usize, and it is intended to be zero here + unsafe { MaybeUninit::zeroed().assume_init() } + } + /// Waits until a value is written into the slot. fn wait_write(&self) { while self.state.load(Ordering::Acquire) & WRITE == 0 { @@ -66,7 +82,7 @@ impl Block { fn new() -> Block { Block { next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + slots: Slot::uninit_block(), } } @@ -205,7 +221,9 @@ impl Unbounded { // Write the value into the slot. let slot = (*block).slots.get_unchecked(offset); - slot.value.get().write(MaybeUninit::new(value)); + slot.value.with_mut(|slot| { + slot.write(MaybeUninit::new(value)); + }); slot.state.fetch_or(WRITE, Ordering::Release); return Ok(()); }, @@ -287,7 +305,7 @@ impl Unbounded { // Read the value. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let value = slot.value.get().read().assume_init(); + let value = slot.value.with_mut(|slot| slot.read().assume_init()); // Destroy the block if we've reached the end, or if another thread wanted to // destroy but couldn't because we were busy reading from the slot. @@ -371,38 +389,49 @@ impl Unbounded { impl Drop for Unbounded { fn drop(&mut self) { - let mut head = *self.head.index.get_mut(); - let mut tail = *self.tail.index.get_mut(); - let mut block = *self.head.block.get_mut(); - - // Erase the lower bits. - head &= !((1 << SHIFT) - 1); - tail &= !((1 << SHIFT) - 1); + let Self { head, tail } = self; + let Position { index: head, block } = &mut **head; - unsafe { - // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. - while head != tail { - let offset = (head >> SHIFT) % LAP; + head.with_mut(|&mut mut head| { + tail.index.with_mut(|&mut mut tail| { + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); - if offset < BLOCK_CAP { - // Drop the value in the slot. - let slot = (*block).slots.get_unchecked(offset); - let value = &mut *slot.value.get(); - value.as_mut_ptr().drop_in_place(); - } else { - // Deallocate the block and move to the next one. - let next = *(*block).next.get_mut(); - drop(Box::from_raw(block)); - block = next; - } + unsafe { + // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the value in the slot. + block.with_mut(|block| { + let slot = (**block).slots.get_unchecked(offset); + slot.value.with_mut(|slot| { + let value = &mut *slot; + value.as_mut_ptr().drop_in_place(); + }); + }); + } else { + // Deallocate the block and move to the next one. + block.with_mut(|block| { + let next_block = (**block).next.with_mut(|next| *next); + drop(Box::from_raw(*block)); + *block = next_block; + }); + } - head = head.wrapping_add(1 << SHIFT); - } + head = head.wrapping_add(1 << SHIFT); + } - // Deallocate the last remaining block. - if !block.is_null() { - drop(Box::from_raw(block)); - } - } + // Deallocate the last remaining block. + block.with_mut(|block| { + if !block.is_null() { + drop(Box::from_raw(*block)); + } + }); + } + }); + }); } } diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..a262c31 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,92 @@ +#![cfg(loom)] + +use concurrent_queue::ConcurrentQueue; +use loom::sync::atomic::{AtomicUsize, Ordering}; +use loom::thread; +use std::sync::Arc; + +fn run_test, usize) + Send + Sync + 'static>(f: F) { + const LIMIT: usize = 500; + + loom::model(move || { + // Run for single, bounded and unbounded. + f(ConcurrentQueue::unbounded(), LIMIT); + f(ConcurrentQueue::bounded(1), 1); + f(ConcurrentQueue::bounded(LIMIT), LIMIT); + }); +} + +#[test] +fn spsc() { + run_test(|q, limit| { + let q = Arc::new(q); + + let q1 = q.clone(); + let t1 = thread::spawn(move || { + for i in 0..limit { + while q1.push(i).is_err() {} + } + }); + + let q2 = q.clone(); + let t2 = thread::spawn(move || { + for i in 0..limit { + loop { + if let Ok(x) = q.pop() { + assert_eq!(x, i); + break; + } + } + } + }); + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} + +#[test] +fn mpmc() { + run_test(|q, limit| { + let pusher_threads = loom::MAX_THREADS / 2; + let popper_threads = loom::MAX_THREADS - pusher_threads; + + let q = Arc::new(q); + let v = (0..limit).map(|_| AtomicUsize::new(0)).collect::>(); + let v = Arc::new(v); + + let mut threads = Vec::new(); + + for _ in 0..popper_threads { + let q = q.clone(); + let v = v.clone(); + threads.push(thread::spawn(move || { + for _ in 0..limit { + let n = loop { + if let Ok(x) = q.pop() { + break x; + } + }; + v[n].fetch_add(1, Ordering::SeqCst); + } + })); + } + + for _ in 0..pusher_threads { + let q = q.clone(); + threads.push(thread::spawn(move || { + for i in 0..limit { + while q.push(i).is_err() {} + } + })); + } + + for t in threads { + t.join().unwrap(); + } + + for c in Arc::try_unwrap(v).unwrap() { + assert_eq!(c.load(Ordering::SeqCst), popper_threads); + } + }); +} From 16c6e41f4f60d0dffc38c56a113d5ed9cebe4806 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 11 Sep 2022 09:02:55 -0700 Subject: [PATCH 2/8] Add portable-atomics --- .github/workflows/ci.yml | 6 +++ Cargo.toml | 2 + src/lib.rs | 20 ++++++--- src/sync.rs | 43 +++++++++++-------- src/unbounded.rs | 19 +++++++-- tests/loom.rs | 92 ---------------------------------------- 6 files changed, 62 insertions(+), 120 deletions(-) delete mode 100644 tests/loom.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d5fb135..1007edb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,8 +29,13 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --features portable-atomic - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack + - name: Run with Loom enabled + run: cargo check + env: + RUSTFLAGS: "{{ env.RUSTFLAGS }} --cfg loom" - run: rustup target add thumbv7m-none-eabi - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps @@ -46,6 +51,7 @@ jobs: - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: cargo build + - run: cargo build --features portable-atomic - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - run: rustup target add thumbv7m-none-eabi diff --git a/Cargo.toml b/Cargo.toml index 1b578e8..010de9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,9 @@ exclude = ["/.*"] bench = false [dependencies] +cfg-if = "1" crossbeam-utils = { version = "0.8.11", default-features = false } +portable-atomic = { version = "0.3", default-features = false, optional = true } [target.'cfg(loom)'.dependencies] loom = "0.5" diff --git a/src/lib.rs b/src/lib.rs index 8e7e2f8..47f95ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -468,13 +468,19 @@ impl fmt::Display for PushError { /// Notify the CPU that we are currently busy-waiting. #[inline] fn busy_wait() { - #[cfg(feature = "std")] - std::thread::yield_now(); - // Use the deprecated `spin_loop_hint` here in order to - // avoid bumping the MSRV. - #[allow(deprecated)] - #[cfg(not(feature = "std"))] - core::sync::atomic::spin_loop_hint() + cfg_if::cfg_if! { + if #[cfg(loom)] { + loom::thread::yield_now(); + } else if #[cfg(feature = "std")] { + std::thread::yield_now(); + } else { + // Use the deprecated `spin_loop_hint` here in order to + // avoid bumping the MSRV. + #[allow(deprecated)] + #[cfg(not(feature = "std"))] + core::sync::atomic::spin_loop_hint() + } + } } /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. diff --git a/src/sync.rs b/src/sync.rs index bf28e10..ff19e2e 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,13 +1,35 @@ //! Synchronization facade to choose between `core` primitives and `loom` primitives. -#[cfg(not(loom))] +#[cfg(all(feature = "portable-atomic", not(loom)))] mod sync_impl { - pub(crate) mod prelude { - pub(crate) use super::{AtomicExt, UnsafeCellExt}; - } + pub(crate) use core::cell; + pub(crate) use portable_atomic as atomic; +} +#[cfg(all(not(feature = "portable-atomic"), not(loom)))] +mod sync_impl { pub(crate) use core::cell; pub(crate) use core::sync::atomic; +} + +#[cfg(loom)] +mod sync_impl { + pub(crate) use loom::cell; + + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + pub(crate) use loom::sync::atomic::*; + } +} + +pub(crate) use sync_impl::*; + +#[cfg(loom)] +pub(crate) mod prelude {} + +#[cfg(not(loom))] +pub(crate) mod prelude { + use super::{atomic, cell}; /// Emulate `loom::UnsafeCell`'s API. pub(crate) trait UnsafeCellExt { @@ -71,16 +93,3 @@ mod sync_impl { } } } - -#[cfg(loom)] -mod sync_impl { - pub(crate) mod prelude {} - pub(crate) use loom::cell; - - pub(crate) mod atomic { - pub(crate) use core::sync::atomic::compiler_fence; - pub(crate) use loom::sync::atomic::*; - } -} - -pub(crate) use sync_impl::*; diff --git a/src/unbounded.rs b/src/unbounded.rs index a7ed6fe..a84f856 100644 --- a/src/unbounded.rs +++ b/src/unbounded.rs @@ -52,10 +52,21 @@ impl Slot { #[cfg(loom)] fn uninit_block() -> [Slot; BLOCK_CAP] { - // SAFETY: Behavior is well-defined when all items are zero: - // - MaybeUninit can have any bit pattern - // - AtomicUsize is transparent to usize, and it is intended to be zero here - unsafe { MaybeUninit::zeroed().assume_init() } + // Repeat this expression 31 times. + // Update if we change BLOCK_CAP + macro_rules! repeat_31 { + ($e: expr) => { + [ + $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, + $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, $e, + ] + }; + } + + repeat_31!(Slot { + value: UnsafeCell::new(MaybeUninit::uninit()), + state: AtomicUsize::new(0), + }) } /// Waits until a value is written into the slot. diff --git a/tests/loom.rs b/tests/loom.rs deleted file mode 100644 index a262c31..0000000 --- a/tests/loom.rs +++ /dev/null @@ -1,92 +0,0 @@ -#![cfg(loom)] - -use concurrent_queue::ConcurrentQueue; -use loom::sync::atomic::{AtomicUsize, Ordering}; -use loom::thread; -use std::sync::Arc; - -fn run_test, usize) + Send + Sync + 'static>(f: F) { - const LIMIT: usize = 500; - - loom::model(move || { - // Run for single, bounded and unbounded. - f(ConcurrentQueue::unbounded(), LIMIT); - f(ConcurrentQueue::bounded(1), 1); - f(ConcurrentQueue::bounded(LIMIT), LIMIT); - }); -} - -#[test] -fn spsc() { - run_test(|q, limit| { - let q = Arc::new(q); - - let q1 = q.clone(); - let t1 = thread::spawn(move || { - for i in 0..limit { - while q1.push(i).is_err() {} - } - }); - - let q2 = q.clone(); - let t2 = thread::spawn(move || { - for i in 0..limit { - loop { - if let Ok(x) = q.pop() { - assert_eq!(x, i); - break; - } - } - } - }); - - t1.join().unwrap(); - t2.join().unwrap(); - }); -} - -#[test] -fn mpmc() { - run_test(|q, limit| { - let pusher_threads = loom::MAX_THREADS / 2; - let popper_threads = loom::MAX_THREADS - pusher_threads; - - let q = Arc::new(q); - let v = (0..limit).map(|_| AtomicUsize::new(0)).collect::>(); - let v = Arc::new(v); - - let mut threads = Vec::new(); - - for _ in 0..popper_threads { - let q = q.clone(); - let v = v.clone(); - threads.push(thread::spawn(move || { - for _ in 0..limit { - let n = loop { - if let Ok(x) = q.pop() { - break x; - } - }; - v[n].fetch_add(1, Ordering::SeqCst); - } - })); - } - - for _ in 0..pusher_threads { - let q = q.clone(); - threads.push(thread::spawn(move || { - for i in 0..limit { - while q.push(i).is_err() {} - } - })); - } - - for t in threads { - t.join().unwrap(); - } - - for c in Arc::try_unwrap(v).unwrap() { - assert_eq!(c.load(Ordering::SeqCst), popper_threads); - } - }); -} From d472e09ae9190710a3aa4f374cc4023ee003d97c Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 11 Sep 2022 09:17:07 -0700 Subject: [PATCH 3/8] Fix loom build --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1007edb..1c6ab8f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: Run with Loom enabled run: cargo check env: - RUSTFLAGS: "{{ env.RUSTFLAGS }} --cfg loom" + RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom - run: rustup target add thumbv7m-none-eabi - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps From ac175a3f30a9ffc7195fbdc2887020b86c6b0a4c Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 12 Sep 2022 13:42:12 -0700 Subject: [PATCH 4/8] add loom test --- tests/loom.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 tests/loom.rs diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..ad1d632 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,55 @@ +#![cfg(loom)] + +use concurrent_queue::ConcurrentQueue; +use loom::sync::Arc; +use loom::thread; + +/// Wrapper to run tests on all three queues. +fn run_test, usize) + Send + Sync + Clone + 'static>(f: F) { + const LIMIT: usize = 2; + + let fc = f.clone(); + loom::model(move || { + fc(ConcurrentQueue::bounded(1), 1); + }); + + let fc = f.clone(); + loom::model(move || { + fc(ConcurrentQueue::bounded(LIMIT), LIMIT); + }); + + return; + + loom::model(move || { + f(ConcurrentQueue::unbounded(), LIMIT); + }); +} + +#[test] +fn spsc() { + run_test(|q, limit| { + let q = Arc::new(q); + let q1 = q.clone(); + + // Spawn a thread that pushes items into the queue. + thread::spawn(move || { + for i in 0..limit { + q1.push(i).unwrap(); + } + }); + + // Spawn a thread that pops items from the queue. + thread::spawn(move || { + for i in 0..limit { + loop { + if let Ok(x) = q.pop() { + assert_eq!(x, i); + break; + } + + thread::yield_now(); + } + } + }); + }); +} \ No newline at end of file From d173aa0835f15789ad39557cb1ba727e279a4e54 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Sun, 18 Sep 2022 11:18:23 -0700 Subject: [PATCH 5/8] Fix loom tests --- .github/workflows/ci.yml | 1 + src/bounded.rs | 8 ++ src/lib.rs | 3 +- tests/loom.rs | 241 +++++++++++++++++++++++++++++++++++---- 4 files changed, 228 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c6ab8f..01f05b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: run: cargo check env: RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom + LOOM_MAX_PREEMPTIONS: 2 - run: rustup target add thumbv7m-none-eabi - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps diff --git a/src/bounded.rs b/src/bounded.rs index c075713..8e0b9f4 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -140,6 +140,10 @@ impl Bounded { return Err(PushError::Full(value)); } + // Loom complains if there isn't an explicit busy wait here. + #[cfg(loom)] + busy_wait(); + tail = self.tail.load(Ordering::Relaxed); } else { // Yield because we need to wait for the stamp to get updated. @@ -208,6 +212,10 @@ impl Bounded { } } + // Loom complains if there isn't a busy-wait here. + #[cfg(loom)] + busy_wait(); + head = self.head.load(Ordering::Relaxed); } else { // Yield because we need to wait for the stamp to get updated. diff --git a/src/lib.rs b/src/lib.rs index 47f95ca..adcdb7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -488,7 +488,8 @@ fn busy_wait() { fn full_fence() { if cfg!(all( any(target_arch = "x86", target_arch = "x86_64"), - not(miri) + not(miri), + not(loom) )) { // HACK(stjepang): On x86 architectures there are two different ways of executing // a `SeqCst` fence. diff --git a/tests/loom.rs b/tests/loom.rs index ad1d632..e60c37e 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -1,25 +1,214 @@ #![cfg(loom)] -use concurrent_queue::ConcurrentQueue; -use loom::sync::Arc; +use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use loom::sync::atomic::{AtomicUsize, Ordering}; +use loom::sync::{Arc, Condvar, Mutex}; use loom::thread; +use std::iter; + +/// A basic MPMC channel based on a ConcurrentQueue and loom primitives. +struct Channel { + /// The queue used to contain items. + queue: ConcurrentQueue, + + /// The number of senders. + senders: AtomicUsize, + + /// The number of receivers. + receivers: AtomicUsize, + + /// The event that is signaled when a new item is pushed. + push_event: Event, + + /// The event that is signaled when a new item is popped. + pop_event: Event, +} + +/// The sending side of a channel. +struct Sender { + /// The channel. + channel: Arc>, +} + +/// The receiving side of a channel. +struct Receiver { + /// The channel. + channel: Arc>, +} + +/// Create a new pair of senders/receivers based on a queue. +fn pair(queue: ConcurrentQueue) -> (Sender, Receiver) { + let channel = Arc::new(Channel { + queue, + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + push_event: Event::new(), + pop_event: Event::new(), + }); + + ( + Sender { + channel: channel.clone(), + }, + Receiver { channel }, + ) +} + +impl Clone for Sender { + fn clone(&self) -> Self { + self.channel.senders.fetch_add(1, Ordering::SeqCst); + Sender { + channel: self.channel.clone(), + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + if self.channel.senders.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the receivers. + self.channel.queue.close(); + self.channel.push_event.signal_all(); + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + self.channel.receivers.fetch_add(1, Ordering::SeqCst); + Receiver { + channel: self.channel.clone(), + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + if self.channel.receivers.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the senders. + self.channel.queue.close(); + self.channel.pop_event.signal_all(); + } + } +} + +impl Sender { + /// Send a value. + /// + /// Returns an error with the value if the channel is closed. + fn send(&self, mut value: T) -> Result<(), T> { + loop { + match self.channel.queue.push(value) { + Ok(()) => { + // Notify a single receiver. + self.channel.push_event.signal(); + return Ok(()); + } + Err(PushError::Closed(val)) => return Err(val), + Err(PushError::Full(val)) => { + // Wait for a receiver to pop an item. + value = val; + self.channel.pop_event.wait(); + } + } + } + } +} + +impl Receiver { + /// Receive a value. + /// + /// Returns an error if the channel is closed. + fn recv(&self) -> Result { + loop { + match self.channel.queue.pop() { + Ok(value) => { + // Notify a single sender. + self.channel.pop_event.signal(); + return Ok(value); + } + Err(PopError::Closed) => return Err(()), + Err(PopError::Empty) => { + // Wait for a sender to push an item. + self.channel.push_event.wait(); + } + } + } + } +} + +/// An event that can be waited on and then signaled. +struct Event { + /// The condition variable used to wait on the event. + condvar: Condvar, + + /// The mutex used to protect the event. + /// + /// Inside is the event's state. The first bit is used to indicate if the + /// notify_one method was called. The second bit is used to indicate if the + /// notify_all method was called. + mutex: Mutex, +} + +impl Event { + /// Create a new event. + fn new() -> Self { + Self { + condvar: Condvar::new(), + mutex: Mutex::new(0), + } + } + + /// Wait for the event to be signaled. + fn wait(&self) { + let mut state = self.mutex.lock().unwrap(); + + loop { + if *state & 0b11 != 0 { + // The event was signaled. + *state &= !0b01; + return; + } + + // Wait for the event to be signaled. + state = self.condvar.wait(state).unwrap(); + } + } + + /// Signal the event. + fn signal(&self) { + let mut state = self.mutex.lock().unwrap(); + *state |= 1; + drop(state); + + self.condvar.notify_one(); + } + + /// Signal the event, but notify all waiters. + fn signal_all(&self) { + let mut state = self.mutex.lock().unwrap(); + *state |= 3; + drop(state); + + self.condvar.notify_all(); + } +} /// Wrapper to run tests on all three queues. fn run_test, usize) + Send + Sync + Clone + 'static>(f: F) { - const LIMIT: usize = 2; + // The length of a loom test seems to increase exponentially the higher this number is. + const LIMIT: usize = 4; let fc = f.clone(); loom::model(move || { - fc(ConcurrentQueue::bounded(1), 1); + fc(ConcurrentQueue::bounded(1), LIMIT); }); let fc = f.clone(); loom::model(move || { - fc(ConcurrentQueue::bounded(LIMIT), LIMIT); + fc(ConcurrentQueue::bounded(LIMIT / 2), LIMIT); }); - return; - loom::model(move || { f(ConcurrentQueue::unbounded(), LIMIT); }); @@ -28,28 +217,32 @@ fn run_test, usize) + Send + Sync + Clone + 'static #[test] fn spsc() { run_test(|q, limit| { - let q = Arc::new(q); - let q1 = q.clone(); + // Create a new pair of senders/receivers. + let (tx, rx) = pair(q); - // Spawn a thread that pushes items into the queue. - thread::spawn(move || { + // Push each onto a thread and run them. + let handle = thread::spawn(move || { for i in 0..limit { - q1.push(i).unwrap(); + if tx.send(i).is_err() { + break; + } } }); - // Spawn a thread that pops items from the queue. - thread::spawn(move || { - for i in 0..limit { - loop { - if let Ok(x) = q.pop() { - assert_eq!(x, i); - break; - } + let mut recv_values = vec![]; - thread::yield_now(); - } + loop { + match rx.recv() { + Ok(value) => recv_values.push(value), + Err(()) => break, } - }); + } + + // Values may not be in order. + recv_values.sort_unstable(); + assert_eq!(recv_values, (0..limit).collect::>()); + + // Join the handle before we exit. + handle.join().unwrap(); }); -} \ No newline at end of file +} From 89b89651ffafa6146f079c2c93a8754ae86ce3b6 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 19 Sep 2022 07:51:03 -0700 Subject: [PATCH 6/8] Make loom optional. --- .github/workflows/ci.yml | 2 +- Cargo.toml | 4 +++- src/lib.rs | 30 +++++++++++------------------- src/sync.rs | 28 ++++++++++++++++++++++++++++ tests/loom.rs | 1 - 5 files changed, 43 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01f05b8..8e0bb39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,7 @@ jobs: - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - name: Run with Loom enabled - run: cargo check + run: cargo test --test loom --features loom env: RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom LOOM_MAX_PREEMPTIONS: 2 diff --git a/Cargo.toml b/Cargo.toml index 010de9f..89daa1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,10 @@ cfg-if = "1" crossbeam-utils = { version = "0.8.11", default-features = false } portable-atomic = { version = "0.3", default-features = false, optional = true } +# Enables loom testing. This feature is permanently unstable and the API may +# change at any time. [target.'cfg(loom)'.dependencies] -loom = "0.5" +loom = { version = "0.5", optional = true } [[bench]] name = "bench" diff --git a/src/lib.rs b/src/lib.rs index adcdb7a..9868f20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,14 +26,23 @@ //! //! # Features //! -//! `concurrent-queue` used an `std` default feature. With this feature enabled, this crate will +//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will //! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this //! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow //! this crate to be used on `no_std` platforms at the potential expense of more busy waiting. +//! +//! There is also a `portable-atomic` feature, which uses a polyfill from the +//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them. +//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on +//! single-threaded targets. Note that even with this feature enabled, `concurrent-queue` still +//! requires a global allocator to be available. See the documentation for the +//! [`std::alloc::GlobalAlloc`] trait for more information. //! //! [Bounded]: `ConcurrentQueue::bounded()` //! [Unbounded]: `ConcurrentQueue::unbounded()` //! [closed]: `ConcurrentQueue::close()` +//! [`portable-atomic`]: https://crates.io/crates/portable-atomic +//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![no_std] @@ -54,6 +63,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use crate::bounded::Bounded; use crate::single::Single; use crate::unbounded::Unbounded; +use crate::sync::busy_wait; mod bounded; mod single; @@ -465,24 +475,6 @@ impl fmt::Display for PushError { } } -/// Notify the CPU that we are currently busy-waiting. -#[inline] -fn busy_wait() { - cfg_if::cfg_if! { - if #[cfg(loom)] { - loom::thread::yield_now(); - } else if #[cfg(feature = "std")] { - std::thread::yield_now(); - } else { - // Use the deprecated `spin_loop_hint` here in order to - // avoid bumping the MSRV. - #[allow(deprecated)] - #[cfg(not(feature = "std"))] - core::sync::atomic::spin_loop_hint() - } - } -} - /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. #[inline] fn full_fence() { diff --git a/src/sync.rs b/src/sync.rs index ff19e2e..1a6c3b0 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -4,12 +4,28 @@ mod sync_impl { pub(crate) use core::cell; pub(crate) use portable_atomic as atomic; + + #[cfg(not(feature = "std"))] + pub(crate) use atomic::hint::spin_loop; + + #[cfg(feature = "std")] + pub(crate) use std::thread::yield_now; } #[cfg(all(not(feature = "portable-atomic"), not(loom)))] mod sync_impl { pub(crate) use core::cell; pub(crate) use core::sync::atomic; + + #[cfg(not(feature = "std"))] + #[inline] + pub(crate) fn spin_loop() { + #[allow(deprecated)] + atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + pub(crate) use std::thread::yield_now; } #[cfg(loom)] @@ -20,10 +36,22 @@ mod sync_impl { pub(crate) use core::sync::atomic::compiler_fence; pub(crate) use loom::sync::atomic::*; } + + pub(crate) use loom::thread::yield_now; } pub(crate) use sync_impl::*; +/// Notify the CPU that we are currently busy-waiting. +#[inline] +pub(crate) fn busy_wait() { + #[cfg(feature = "std")] + yield_now(); + + #[cfg(not(feature = "std"))] + spin_loop(); +} + #[cfg(loom)] pub(crate) mod prelude {} diff --git a/tests/loom.rs b/tests/loom.rs index e60c37e..e05bd4a 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -4,7 +4,6 @@ use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use loom::sync::atomic::{AtomicUsize, Ordering}; use loom::sync::{Arc, Condvar, Mutex}; use loom::thread; -use std::iter; /// A basic MPMC channel based on a ConcurrentQueue and loom primitives. struct Channel { From 3ecbac368e10aa90ac94b3b79d5f79067ab8849b Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 19 Sep 2022 07:51:19 -0700 Subject: [PATCH 7/8] fmt --- src/lib.rs | 8 ++++---- src/sync.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9868f20..5f64fde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,12 +30,12 @@ //! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this //! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow //! this crate to be used on `no_std` platforms at the potential expense of more busy waiting. -//! -//! There is also a `portable-atomic` feature, which uses a polyfill from the +//! +//! There is also a `portable-atomic` feature, which uses a polyfill from the //! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them. //! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on //! single-threaded targets. Note that even with this feature enabled, `concurrent-queue` still -//! requires a global allocator to be available. See the documentation for the +//! requires a global allocator to be available. See the documentation for the //! [`std::alloc::GlobalAlloc`] trait for more information. //! //! [Bounded]: `ConcurrentQueue::bounded()` @@ -62,8 +62,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use crate::bounded::Bounded; use crate::single::Single; -use crate::unbounded::Unbounded; use crate::sync::busy_wait; +use crate::unbounded::Unbounded; mod bounded; mod single; diff --git a/src/sync.rs b/src/sync.rs index 1a6c3b0..ca283bb 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -16,7 +16,7 @@ mod sync_impl { mod sync_impl { pub(crate) use core::cell; pub(crate) use core::sync::atomic; - + #[cfg(not(feature = "std"))] #[inline] pub(crate) fn spin_loop() { From 5dbeb4519491f59bedd6306edc10ef1597a963cf Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 19 Sep 2022 08:21:47 -0700 Subject: [PATCH 8/8] remove cfg-if --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 89daa1b..7715cb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ exclude = ["/.*"] bench = false [dependencies] -cfg-if = "1" crossbeam-utils = { version = "0.8.11", default-features = false } portable-atomic = { version = "0.3", default-features = false, optional = true }