Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fn force_push to ArrayQueue #789

Merged
merged 1 commit into from Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 92 additions & 35 deletions crossbeam-queue/src/array_queue.rs
Expand Up @@ -27,9 +27,11 @@ struct Slot<T> {
///
/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
/// faster than [`SegQueue`].
/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
/// a bit faster than [`SegQueue`].
///
/// [`force_push`]: ArrayQueue::force_push
/// [`SegQueue`]: super::SegQueue
///
/// # Examples
Expand Down Expand Up @@ -120,21 +122,10 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
where
F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
{
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);

Expand All @@ -143,23 +134,23 @@ impl<T> ArrayQueue<T> {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);

let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);

// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
Expand All @@ -182,14 +173,7 @@ impl<T> ArrayQueue<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(value);
}

value = f(value, tail, new_tail, slot)?;
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
Expand All @@ -200,6 +184,79 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
self.push_or_else(value, |v, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(v)
} else {
Ok(v)
}
})
}

/// Pushes an element into the queue, replacing the oldest element if necessary.
///
/// If the queue is full, the oldest element is replaced and returned,
/// otherwise `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(2);
///
/// assert_eq!(q.force_push(10), None);
/// assert_eq!(q.force_push(20), None);
/// assert_eq!(q.force_push(30), Some(10));
/// assert_eq!(q.pop(), Some(20));
/// ```
pub fn force_push(&self, value: T) -> Option<T> {
self.push_or_else(value, |v, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);

// Try moving the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);
taiki-e marked this conversation as resolved.
Show resolved Hide resolved

// Swap the previous value.
let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };

// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);

Err(old)
} else {
Ok(v)
}
})
.err()
}

/// Attempts to pop an element from the queue.
///
/// If the queue is empty, `None` is returned.
Expand Down
93 changes: 92 additions & 1 deletion crossbeam-queue/tests/array_queue.rs
Expand Up @@ -144,6 +144,45 @@ fn spsc() {
.unwrap();
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn spsc_ring_buffer() {
const COUNT: usize = 100_000;

let t = AtomicUsize::new(1);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});

scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), 1);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc() {
Expand Down Expand Up @@ -181,6 +220,50 @@ fn mpmc() {
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc_ring_buffer() {
const COUNT: usize = 25_000;
const THREADS: usize = 4;

let t = AtomicUsize::new(THREADS);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});
}

for _ in 0..THREADS {
scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
}
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
Expand Down Expand Up @@ -244,13 +327,21 @@ fn linearizable() {
let q = ArrayQueue::new(THREADS);

scope(|scope| {
for _ in 0..THREADS {
for _ in 0..THREADS / 2 {
scope.spawn(|_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
});

scope.spawn(|_| {
for _ in 0..COUNT {
if q.force_push(0).is_none() {
q.pop().unwrap();
}
}
});
}
})
.unwrap();
Expand Down