Skip to content

Commit

Permalink
always use crossbeam::AtomicCell when capacity is one
Browse files Browse the repository at this point in the history
  • Loading branch information
brunocodutra committed Feb 24, 2022
1 parent ff49587 commit fdd4517
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
10 changes: 5 additions & 5 deletions benches/throughput.rs
Expand Up @@ -3,7 +3,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion, Thro
use futures::future::{try_join, try_join_all};
use futures::prelude::*;
use ring_channel::{ring_channel, RingReceiver, RingSender, TryRecvError};
use std::{fmt::Debug, hint::spin_loop, iter, mem::size_of, num::NonZeroUsize, time::Duration};
use std::{collections::BTreeSet, fmt::Debug, hint::spin_loop, iter, mem::size_of, time::Duration};
use tokio::runtime::Runtime;
use tokio::task::{self, JoinHandle};

Expand All @@ -19,7 +19,7 @@ async fn run<R, const N: usize>(msgs: usize, cap: usize, p: usize, c: usize) ->
where
R: Routine<[char; N]>,
{
let (tx, rx) = ring_channel(NonZeroUsize::new(cap).unwrap());
let (tx, rx) = ring_channel(cap.try_into().unwrap());
let producer = try_join_all(iter::repeat(tx).take(p).map(|tx| R::produce(tx, msgs / p)));
let consumer = try_join_all(iter::repeat(rx).take(c).map(|rx| R::consume(rx, msgs / c)));

Expand All @@ -34,12 +34,12 @@ fn bench<R, const N: usize>(group: &mut BenchmarkGroup<WallTime>, p: usize, c: u
where
R: Debug + Default + Routine<[char; N]>,
{
let prefix = format!("{:?}/{}B/{}x{}", R::default(), size_of::<[char; N]>(), p, c);
let prefix = format!("{}x{}/{:?}/{}B", p, c, R::default(), size_of::<[char; N]>());

let messages_per_iter = 256 * p;
let messages_per_iter = 128 * p;
group.throughput(Throughput::Elements(messages_per_iter as u64));

for &cap in &[1, p + c] {
for cap in BTreeSet::from([1, 2, p + c]) {
group.bench_function(format!("{}/{}", prefix, cap), |b| {
b.to_async(Runtime::new().unwrap())
.iter_custom(|iters| run::<R, N>(iters as usize * messages_per_iter, cap, p, c));
Expand Down
70 changes: 45 additions & 25 deletions src/buffer.rs
@@ -1,55 +1,64 @@
use alloc::boxed::Box;
use crossbeam_queue::ArrayQueue;
use crossbeam_utils::atomic::AtomicCell;
use derivative::Derivative;

type AtomicOption<T> = crossbeam_utils::atomic::AtomicCell<Option<T>>;

#[derive(Derivative)]
#[derivative(Debug)]
#[allow(clippy::large_enum_variant)]
pub(super) enum RingBuffer<T> {
Atomic(#[derivative(Debug = "ignore")] AtomicCell<Option<T>>),
Boxed(#[derivative(Debug = "ignore")] AtomicCell<Option<Box<T>>>),
Queue(#[derivative(Debug = "ignore")] ArrayQueue<T>),
Cell(#[derivative(Debug = "ignore")] AtomicOption<T>),
}

impl<T> RingBuffer<T> {
pub(super) fn new(capacity: usize) -> Self {
if capacity > 1 || !AtomicOption::<T>::is_lock_free() {
RingBuffer::Queue(ArrayQueue::new(capacity))
assert!(capacity > 0, "capacity must be non-zero");

if capacity == 1 && AtomicCell::<Option<T>>::is_lock_free() {
RingBuffer::Atomic(AtomicCell::new(None))
} else if capacity == 1 {
debug_assert!(AtomicCell::<Option<Box<T>>>::is_lock_free());
RingBuffer::Boxed(AtomicCell::new(None))
} else {
RingBuffer::Cell(AtomicOption::new(None))
RingBuffer::Queue(ArrayQueue::new(capacity))
}
}

#[cfg(test)]
pub(super) fn capacity(&self) -> usize {
use RingBuffer::*;
match self {
Queue(q) => q.capacity(),
Cell(_) => 1,
RingBuffer::Atomic(_) => 1,
RingBuffer::Boxed(_) => 1,
RingBuffer::Queue(q) => q.capacity(),
}
}

pub(super) fn push(&self, mut value: T) {
use RingBuffer::*;
match self {
Queue(q) => {
RingBuffer::Atomic(c) => {
c.store(Some(value));
}

RingBuffer::Boxed(b) => {
b.store(Some(Box::new(value)));
}

RingBuffer::Queue(q) => {
while let Err(v) = q.push(value) {
self.pop();
value = v;
}
}

Cell(c) => {
c.swap(Some(value));
}
}
}

pub(super) fn pop(&self) -> Option<T> {
use RingBuffer::*;
match self {
Queue(q) => q.pop(),
Cell(c) => c.swap(None),
RingBuffer::Atomic(c) => c.take(),
RingBuffer::Boxed(b) => Some(*b.take()?),
RingBuffer::Queue(q) => q.pop(),
}
}
}
Expand All @@ -65,11 +74,17 @@ mod tests {
use test_strategy::proptest;
use tokio::{runtime, task::spawn_blocking};

#[should_panic]
#[proptest]
fn new_panics_if_capacity_is_zero() {
RingBuffer::<()>::new(0);
}

#[proptest]
fn new_uses_atomic_cell_when_possible() {
fn new_uses_atomic_cell_when_capacity_is_one() {
assert_eq!(
discriminant(&RingBuffer::<[char; 1]>::new(1)),
discriminant(&RingBuffer::Cell(Default::default()))
discriminant(&RingBuffer::Atomic(Default::default()))
);

assert_eq!(
Expand All @@ -79,24 +94,29 @@ mod tests {

assert_eq!(
discriminant(&RingBuffer::<[char; 4]>::new(1)),
discriminant(&RingBuffer::Queue(ArrayQueue::new(1)))
discriminant(&RingBuffer::Boxed(Default::default()))
);

assert_eq!(
discriminant(&RingBuffer::<[char; 4]>::new(2)),
discriminant(&RingBuffer::Queue(ArrayQueue::new(2)))
);

assert_eq!(
discriminant(&RingBuffer::<RingSender<()>>::new(1)),
discriminant(&RingBuffer::Cell(Default::default()))
discriminant(&RingBuffer::Atomic(Default::default()))
);

assert_eq!(
discriminant(&RingBuffer::<RingReceiver<()>>::new(1)),
discriminant(&RingBuffer::Cell(Default::default()))
discriminant(&RingBuffer::Atomic(Default::default()))
);
}

#[proptest]
fn capacity_returns_the_maximum_buffer_size(#[strategy(1..=10usize)] capacity: usize) {
let buffer = RingBuffer::<()>::new(capacity);
assert_eq!(buffer.capacity(), capacity);
assert_eq!(RingBuffer::<[char; 1]>::new(capacity).capacity(), capacity);
assert_eq!(RingBuffer::<[char; 4]>::new(capacity).capacity(), capacity);
}

#[proptest]
Expand Down

0 comments on commit fdd4517

Please sign in to comment.