diff --git a/crossbeam-queue/src/array_queue.rs b/crossbeam-queue/src/array_queue.rs index 5f3061b70..c34f589e7 100644 --- a/crossbeam-queue/src/array_queue.rs +++ b/crossbeam-queue/src/array_queue.rs @@ -27,9 +27,11 @@ struct Slot { /// /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an -/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit -/// faster than [`SegQueue`]. +/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for +/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue +/// a bit faster than [`SegQueue`]. /// +/// [`force_push`]: ArrayQueue::force_push /// [`SegQueue`]: super::SegQueue /// /// # Examples @@ -120,21 +122,10 @@ impl ArrayQueue { } } - /// Attempts to push an element into the queue. - /// - /// If the queue is full, the element is returned back as an error. - /// - /// # Examples - /// - /// ``` - /// use crossbeam_queue::ArrayQueue; - /// - /// let q = ArrayQueue::new(1); - /// - /// assert_eq!(q.push(10), Ok(())); - /// assert_eq!(q.push(20), Err(20)); - /// ``` - pub fn push(&self, value: T) -> Result<(), T> { + fn push_or_else(&self, mut value: T, f: F) -> Result<(), T> + where + F: Fn(T, usize, usize, &Slot) -> Result, + { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -143,6 +134,16 @@ impl ArrayQueue { let index = tail & (self.one_lap - 1); let lap = tail & !(self.one_lap - 1); + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + // Inspect the corresponding slot. debug_assert!(index < self.buffer.len()); let slot = unsafe { self.buffer.get_unchecked(index) }; @@ -150,16 +151,6 @@ impl ArrayQueue { // If the tail and the stamp match, we may attempt to push. if tail == stamp { - let new_tail = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - // Try moving the tail. match self.tail.compare_exchange_weak( tail, @@ -182,14 +173,7 @@ impl ArrayQueue { } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { atomic::fence(Ordering::SeqCst); - let head = self.head.load(Ordering::Relaxed); - - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the queue is full. - return Err(value); - } - + value = f(value, tail, new_tail, slot)?; backoff.spin(); tail = self.tail.load(Ordering::Relaxed); } else { @@ -200,6 +184,79 @@ impl ArrayQueue { } } + /// Attempts to push an element into the queue. + /// + /// If the queue is full, the element is returned back as an error. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(1); + /// + /// assert_eq!(q.push(10), Ok(())); + /// assert_eq!(q.push(20), Err(20)); + /// ``` + pub fn push(&self, value: T) -> Result<(), T> { + self.push_or_else(value, |v, tail, _, _| { + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the queue is full. + Err(v) + } else { + Ok(v) + } + }) + } + + /// Pushes an element into the queue, replacing the oldest element if necessary. + /// + /// If the queue is full, the oldest element is replaced and returned, + /// otherwise `None` is returned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_queue::ArrayQueue; + /// + /// let q = ArrayQueue::new(2); + /// + /// assert_eq!(q.force_push(10), None); + /// assert_eq!(q.force_push(20), None); + /// assert_eq!(q.force_push(30), Some(10)); + /// assert_eq!(q.pop(), Some(20)); + /// ``` + pub fn force_push(&self, value: T) -> Option { + self.push_or_else(value, |v, tail, new_tail, slot| { + let head = tail.wrapping_sub(self.one_lap); + let new_head = new_tail.wrapping_sub(self.one_lap); + + // Try moving the head. + if self + .head + .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // Move the tail. + self.tail.store(new_tail, Ordering::SeqCst); + + // Swap the previous value. + let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() }; + + // Update the stamp. + slot.stamp.store(tail + 1, Ordering::Release); + + Err(old) + } else { + Ok(v) + } + }) + .err() + } + /// Attempts to pop an element from the queue. /// /// If the queue is empty, `None` is returned. diff --git a/crossbeam-queue/tests/array_queue.rs b/crossbeam-queue/tests/array_queue.rs index a23e08201..9a64eac03 100644 --- a/crossbeam-queue/tests/array_queue.rs +++ b/crossbeam-queue/tests/array_queue.rs @@ -144,6 +144,45 @@ fn spsc() { .unwrap(); } +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn spsc_ring_buffer() { + const COUNT: usize = 100_000; + + let t = AtomicUsize::new(1); + let q = ArrayQueue::::new(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + + scope(|scope| { + scope.spawn(|_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Some(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if let Some(n) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }); + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), 1); + } +} + #[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn mpmc() { @@ -181,6 +220,50 @@ fn mpmc() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn mpmc_ring_buffer() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let t = AtomicUsize::new(THREADS); + let q = ArrayQueue::::new(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Some(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }); + } + + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + if let Some(n) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + #[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn drops() { @@ -244,13 +327,21 @@ fn linearizable() { let q = ArrayQueue::new(THREADS); scope(|scope| { - for _ in 0..THREADS { + for _ in 0..THREADS / 2 { scope.spawn(|_| { for _ in 0..COUNT { while q.push(0).is_err() {} q.pop().unwrap(); } }); + + scope.spawn(|_| { + for _ in 0..COUNT { + if q.force_push(0).is_none() { + q.pop().unwrap(); + } + } + }); } }) .unwrap();