Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Heap allocate the values array #5294

Merged
merged 3 commits into from Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 103 additions & 40 deletions tokio/src/sync/mpsc/block.rs
@@ -1,6 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};

use std::alloc::Layout;
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
Expand All @@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
pub(crate) struct Block<T> {
/// The header fields.
header: BlockHeader<T>,

/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
}

/// Extra fields for a `Block<T>`.
struct BlockHeader<T> {
/// The start index of this block.
///
/// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
Expand All @@ -24,18 +36,14 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: UnsafeCell<usize>,

/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
}

pub(crate) enum Read<T> {
Value(T),
Closed,
}

#[repr(transparent)]
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);

use super::BLOCK_CAP;
Expand Down Expand Up @@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize {
SLOT_MASK & slot_index
}

generate_addr_of_methods! {
impl<T> Block<T> {
unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
&self.header
}

unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
&self.values
}
}
}

impl<T> Block<T> {
pub(crate) fn new(start_index: usize) -> Block<T> {
Block {
// The absolute index in the channel of the first slot in the block.
start_index,
pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
unsafe {
// Allocate the block on the heap.
// SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
let block = match NonNull::new(block) {
Some(block) => block,
None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
};

// Write the header to the block.
Block::addr_of_header(block).as_ptr().write(BlockHeader {
// The absolute index in the channel of the first slot in the block.
start_index,

// Pointer to the next block in the linked list.
next: AtomicPtr::new(ptr::null_mut()),
// Pointer to the next block in the linked list.
next: AtomicPtr::new(ptr::null_mut()),

ready_slots: AtomicUsize::new(0),
ready_slots: AtomicUsize::new(0),

observed_tail_position: UnsafeCell::new(0),
observed_tail_position: UnsafeCell::new(0),
});

// Value storage
values: unsafe { Values::uninitialized() },
// Initialize the values array.
Values::initialize(Block::addr_of_values(block));

// Convert the pointer to a `Box`.
// Safety: The raw pointer was allocated using the global allocator, and with
// the layout for a `Block<T>`, so it's valid to convert it to box.
Box::from_raw(block.as_ptr())
}
}

/// Returns `true` if the block matches the given index.
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
self.start_index == index
self.header.start_index == index
}

/// Returns the number of blocks between `self` and the block at the
Expand All @@ -101,7 +137,7 @@ impl<T> Block<T> {
/// `start_index` must represent a block *after* `self`.
pub(crate) fn distance(&self, other_index: usize) -> usize {
debug_assert!(offset(other_index) == 0);
other_index.wrapping_sub(self.start_index) / BLOCK_CAP
other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
}

/// Reads the value at the given offset.
Expand All @@ -116,7 +152,7 @@ impl<T> Block<T> {
pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
let offset = offset(slot_index);

let ready_bits = self.ready_slots.load(Acquire);
let ready_bits = self.header.ready_slots.load(Acquire);

if !is_ready(ready_bits, offset) {
if is_tx_closed(ready_bits) {
Expand Down Expand Up @@ -156,7 +192,7 @@ impl<T> Block<T> {

/// Signal to the receiver that the sender half of the list is closed.
pub(crate) unsafe fn tx_close(&self) {
self.ready_slots.fetch_or(TX_CLOSED, Release);
self.header.ready_slots.fetch_or(TX_CLOSED, Release);
}

/// Resets the block to a blank state. This enables reusing blocks in the
Expand All @@ -169,9 +205,9 @@ impl<T> Block<T> {
/// * All slots are empty.
/// * The caller holds a unique pointer to the block.
pub(crate) unsafe fn reclaim(&mut self) {
self.start_index = 0;
self.next = AtomicPtr::new(ptr::null_mut());
self.ready_slots = AtomicUsize::new(0);
self.header.start_index = 0;
self.header.next = AtomicPtr::new(ptr::null_mut());
self.header.ready_slots = AtomicUsize::new(0);
}

/// Releases the block to the rx half for freeing.
Expand All @@ -187,19 +223,20 @@ impl<T> Block<T> {
pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
// Track the observed tail_position. Any sender targeting a greater
// tail_position is guaranteed to not access this block.
self.observed_tail_position
self.header
.observed_tail_position
.with_mut(|ptr| *ptr = tail_position);

// Set the released bit, signalling to the receiver that it is safe to
// free the block's memory as soon as all slots **prior** to
// `observed_tail_position` have been filled.
self.ready_slots.fetch_or(RELEASED, Release);
self.header.ready_slots.fetch_or(RELEASED, Release);
}

/// Mark a slot as ready
fn set_ready(&self, slot: usize) {
let mask = 1 << slot;
self.ready_slots.fetch_or(mask, Release);
self.header.ready_slots.fetch_or(mask, Release);
}

/// Returns `true` when all slots have their `ready` bits set.
Expand All @@ -214,25 +251,31 @@ impl<T> Block<T> {
/// single atomic cell. However, this could have negative impact on cache
/// behavior as there would be many more mutations to a single slot.
pub(crate) fn is_final(&self) -> bool {
self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
}

/// Returns the `observed_tail_position` value, if set
pub(crate) fn observed_tail_position(&self) -> Option<usize> {
if 0 == RELEASED & self.ready_slots.load(Acquire) {
if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
None
} else {
Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
Some(
self.header
.observed_tail_position
.with(|ptr| unsafe { *ptr }),
)
}
}

/// Loads the next block
pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
let ret = NonNull::new(self.next.load(ordering));
let ret = NonNull::new(self.header.next.load(ordering));

debug_assert!(unsafe {
ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
.unwrap_or(true)
ret.map(|block| {
block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
})
.unwrap_or(true)
});

ret
Expand Down Expand Up @@ -260,9 +303,10 @@ impl<T> Block<T> {
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.header
.next
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);
Expand Down Expand Up @@ -291,7 +335,7 @@ impl<T> Block<T> {
// Create the new block. It is assumed that the block will become the
// next one after `&self`. If this turns out to not be the case,
// `start_index` is updated accordingly.
let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
let new_block = Block::new(self.header.start_index + BLOCK_CAP);

let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };

Expand All @@ -308,7 +352,8 @@ impl<T> Block<T> {
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(
self.next
self.header
.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);
Expand Down Expand Up @@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool {
}

impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();

/// Initialize a `Values` struct from a pointer.
///
/// # Safety
///
/// The raw pointer must be valid for writing a `Values<T>`.
unsafe fn initialize(_value: NonNull<Values<T>>) {
notgull marked this conversation as resolved.
Show resolved Hide resolved
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}

Values(vals.assume_init())
}
}

Expand All @@ -383,3 +429,20 @@ impl<T> ops::Index<usize> for Values<T> {
self.0.index(index)
}
}

#[cfg(all(test, not(loom)))]
#[test]
fn assert_no_stack_overflow() {
// https://github.com/tokio-rs/tokio/issues/5293

struct Foo {
_a: [u8; 2_000_000],
}

assert_eq!(
Layout::new::<MaybeUninit<Block<Foo>>>(),
Layout::new::<Block<Foo>>()
);

let _block = Block::<Foo>::new(0);
}
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/list.rs
Expand Up @@ -44,7 +44,7 @@ pub(crate) enum TryPopResult<T> {

pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
// Create the initial block shared between the tx and rx halves.
let initial_block = Box::new(Block::new(0));
let initial_block = Block::new(0);
let initial_block_ptr = Box::into_raw(initial_block);

let tx = Tx {
Expand Down