/
control.rs
97 lines (80 loc) · 2.72 KB
/
control.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::buffer::*;
use alloc::boxed::Box;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::{ops::Deref, ptr::NonNull};
use crossbeam_utils::CachePadded;
use derivative::Derivative;
#[cfg(feature = "futures_api")]
use crate::waitlist::*;
#[cfg(feature = "futures_api")]
use core::task::Waker;
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub(super) struct ControlBlock<T> {
pub(super) senders: CachePadded<AtomicUsize>,
pub(super) receivers: CachePadded<AtomicUsize>,
pub(super) connected: AtomicBool,
pub(super) buffer: RingBuffer<T>,
#[cfg(feature = "futures_api")]
#[derivative(Debug = "ignore")]
pub(super) waitlist: Waitlist<Waker>,
}
impl<T> ControlBlock<T> {
fn new(capacity: usize) -> Self {
Self {
senders: CachePadded::new(AtomicUsize::new(1)),
receivers: CachePadded::new(AtomicUsize::new(1)),
connected: AtomicBool::new(true),
buffer: RingBuffer::new(capacity),
#[cfg(feature = "futures_api")]
waitlist: Waitlist::new(),
}
}
}
#[derive(Derivative, Eq, PartialEq)]
#[derivative(Debug(bound = ""), Clone(bound = ""))]
pub(super) struct ControlBlockRef<T>(NonNull<ControlBlock<T>>);
impl<T> Unpin for ControlBlockRef<T> {}
impl<T> ControlBlockRef<T> {
pub(super) fn new(capacity: usize) -> Self {
ControlBlockRef(unsafe {
NonNull::new_unchecked(Box::into_raw(Box::new(ControlBlock::new(capacity))))
})
}
}
impl<T> Deref for ControlBlockRef<T> {
type Target = ControlBlock<T>;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.0.as_ref() }
}
}
impl<T> Drop for ControlBlockRef<T> {
fn drop(&mut self) {
debug_assert!(!self.connected.load(Ordering::SeqCst));
debug_assert_eq!(self.senders.load(Ordering::SeqCst), 0);
debug_assert_eq!(self.receivers.load(Ordering::SeqCst), 0);
unsafe { Box::from_raw(&**self as *const ControlBlock<T> as *mut ControlBlock<T>) };
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_strategy::proptest;
#[proptest]
fn control_block_starts_connected() {
let ctrl = ControlBlock::<()>::new(1);
assert!(ctrl.connected.load(Ordering::SeqCst));
}
#[proptest]
fn control_block_starts_with_reference_counters_equal_to_one() {
let ctrl = ControlBlock::<()>::new(1);
assert_eq!(ctrl.senders.load(Ordering::SeqCst), 1);
assert_eq!(ctrl.receivers.load(Ordering::SeqCst), 1);
}
#[proptest]
fn control_block_allocates_buffer_given_capacity(#[strategy(1..=10usize)] capacity: usize) {
let ctrl = ControlBlock::<()>::new(capacity);
assert_eq!(ctrl.buffer.capacity(), capacity);
}
}