Skip to content

Commit

Permalink
sync: decrease stack usage in mpsc channel (#5294)
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Dec 15, 2022
1 parent 3976622 commit 81b50e9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 41 deletions.
143 changes: 103 additions & 40 deletions tokio/src/sync/mpsc/block.rs
@@ -1,6 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};

use std::alloc::Layout;
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
Expand All @@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
pub(crate) struct Block<T> {
/// The header fields.
header: BlockHeader<T>,

/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
}

/// Extra fields for a `Block<T>`.
struct BlockHeader<T> {
/// The start index of this block.
///
/// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
Expand All @@ -24,18 +36,14 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: UnsafeCell<usize>,

/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
}

pub(crate) enum Read<T> {
Value(T),
Closed,
}

#[repr(transparent)]
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);

use super::BLOCK_CAP;
Expand Down Expand Up @@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize {
SLOT_MASK & slot_index
}

generate_addr_of_methods! {
impl<T> Block<T> {
unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
&self.header
}

unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
&self.values
}
}
}

impl<T> Block<T> {
pub(crate) fn new(start_index: usize) -> Block<T> {
Block {
// The absolute index in the channel of the first slot in the block.
start_index,
pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
unsafe {
// Allocate the block on the heap.
// SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
let block = match NonNull::new(block) {
Some(block) => block,
None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
};

// Write the header to the block.
Block::addr_of_header(block).as_ptr().write(BlockHeader {
// The absolute index in the channel of the first slot in the block.
start_index,

// Pointer to the next block in the linked list.
next: AtomicPtr::new(ptr::null_mut()),
// Pointer to the next block in the linked list.
next: AtomicPtr::new(ptr::null_mut()),

ready_slots: AtomicUsize::new(0),
ready_slots: AtomicUsize::new(0),

observed_tail_position: UnsafeCell::new(0),
observed_tail_position: UnsafeCell::new(0),
});

// Value storage
values: unsafe { Values::uninitialized() },
// Initialize the values array.
Values::initialize(Block::addr_of_values(block));

// Convert the pointer to a `Box`.
// Safety: The raw pointer was allocated using the global allocator, and with
// the layout for a `Block<T>`, so it's valid to convert it to box.
Box::from_raw(block.as_ptr())
}
}

/// Returns `true` if the block matches the given index.
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
self.start_index == index
self.header.start_index == index
}

/// Returns the number of blocks between `self` and the block at the
Expand All @@ -101,7 +137,7 @@ impl<T> Block<T> {
/// `start_index` must represent a block *after* `self`.
pub(crate) fn distance(&self, other_index: usize) -> usize {
debug_assert!(offset(other_index) == 0);
other_index.wrapping_sub(self.start_index) / BLOCK_CAP
other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
}

/// Reads the value at the given offset.
Expand All @@ -116,7 +152,7 @@ impl<T> Block<T> {
pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
let offset = offset(slot_index);

let ready_bits = self.ready_slots.load(Acquire);
let ready_bits = self.header.ready_slots.load(Acquire);

if !is_ready(ready_bits, offset) {
if is_tx_closed(ready_bits) {
Expand Down Expand Up @@ -156,7 +192,7 @@ impl<T> Block<T> {

/// Signal to the receiver that the sender half of the list is closed.
pub(crate) unsafe fn tx_close(&self) {
self.ready_slots.fetch_or(TX_CLOSED, Release);
self.header.ready_slots.fetch_or(TX_CLOSED, Release);
}

/// Resets the block to a blank state. This enables reusing blocks in the
Expand All @@ -169,9 +205,9 @@ impl<T> Block<T> {
/// * All slots are empty.
/// * The caller holds a unique pointer to the block.
pub(crate) unsafe fn reclaim(&mut self) {
self.start_index = 0;
self.next = AtomicPtr::new(ptr::null_mut());
self.ready_slots = AtomicUsize::new(0);
self.header.start_index = 0;
self.header.next = AtomicPtr::new(ptr::null_mut());
self.header.ready_slots = AtomicUsize::new(0);
}

/// Releases the block to the rx half for freeing.
Expand All @@ -187,19 +223,20 @@ impl<T> Block<T> {
pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
// Track the observed tail_position. Any sender targeting a greater
// tail_position is guaranteed to not access this block.
self.observed_tail_position
self.header
.observed_tail_position
.with_mut(|ptr| *ptr = tail_position);

// Set the released bit, signalling to the receiver that it is safe to
// free the block's memory as soon as all slots **prior** to
// `observed_tail_position` have been filled.
self.ready_slots.fetch_or(RELEASED, Release);
self.header.ready_slots.fetch_or(RELEASED, Release);
}

/// Mark a slot as ready
fn set_ready(&self, slot: usize) {
let mask = 1 << slot;
self.ready_slots.fetch_or(mask, Release);
self.header.ready_slots.fetch_or(mask, Release);
}

/// Returns `true` when all slots have their `ready` bits set.
Expand All @@ -214,25 +251,31 @@ impl<T> Block<T> {
/// single atomic cell. However, this could have negative impact on cache
/// behavior as there would be many more mutations to a single slot.
pub(crate) fn is_final(&self) -> bool {
self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
}

/// Returns the `observed_tail_position` value, if set
pub(crate) fn observed_tail_position(&self) -> Option<usize> {
if 0 == RELEASED & self.ready_slots.load(Acquire) {
if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
None
} else {
Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
Some(
self.header
.observed_tail_position
.with(|ptr| unsafe { *ptr }),
)
}
}

/// Loads the next block
pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
let ret = NonNull::new(self.next.load(ordering));
let ret = NonNull::new(self.header.next.load(ordering));

debug_assert!(unsafe {
ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
.unwrap_or(true)
ret.map(|block| {
block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
})
.unwrap_or(true)
});

ret
Expand Down Expand Up @@ -260,9 +303,10 @@ impl<T> Block<T> {
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.header
.next
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);
Expand Down Expand Up @@ -291,7 +335,7 @@ impl<T> Block<T> {
// Create the new block. It is assumed that the block will become the
// next one after `&self`. If this turns out to not be the case,
// `start_index` is updated accordingly.
let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
let new_block = Block::new(self.header.start_index + BLOCK_CAP);

let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };

Expand All @@ -308,7 +352,8 @@ impl<T> Block<T> {
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(
self.next
self.header
.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);
Expand Down Expand Up @@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool {
}

impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();

/// Initialize a `Values` struct from a pointer.
///
/// # Safety
///
/// The raw pointer must be valid for writing a `Values<T>`.
unsafe fn initialize(_value: NonNull<Values<T>>) {
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}

Values(vals.assume_init())
}
}

Expand All @@ -383,3 +429,20 @@ impl<T> ops::Index<usize> for Values<T> {
self.0.index(index)
}
}

#[cfg(all(test, not(loom)))]
#[test]
fn assert_no_stack_overflow() {
// https://github.com/tokio-rs/tokio/issues/5293

struct Foo {
_a: [u8; 2_000_000],
}

assert_eq!(
Layout::new::<MaybeUninit<Block<Foo>>>(),
Layout::new::<Block<Foo>>()
);

let _block = Block::<Foo>::new(0);
}
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/list.rs
Expand Up @@ -44,7 +44,7 @@ pub(crate) enum TryPopResult<T> {

pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
// Create the initial block shared between the tx and rx halves.
let initial_block = Box::new(Block::new(0));
let initial_block = Block::new(0);
let initial_block_ptr = Box::into_raw(initial_block);

let tx = Tx {
Expand Down

0 comments on commit 81b50e9

Please sign in to comment.