From 8541e13a900f1457042238a93bdfc01998e42f9d Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 13 Dec 2022 09:38:47 -0800 Subject: [PATCH 1/3] Heap allocate the values array --- tokio/src/sync/mpsc/block.rs | 47 +++++++++++++++++++++++++++--------- tokio/src/sync/mpsc/list.rs | 2 +- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index 58f4a9f6cc3..ae97f05fb80 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -1,6 +1,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; +use std::alloc::Layout; use std::mem::MaybeUninit; use std::ops; use std::ptr::{self, NonNull}; @@ -28,7 +29,7 @@ pub(crate) struct Block { /// Array containing values pushed into the block. Values are stored in a /// continuous array in order to improve cache line behavior when reading. /// The values must be manually dropped. - values: Values, + values: Box>, } pub(crate) enum Read { @@ -72,8 +73,8 @@ pub(crate) fn offset(slot_index: usize) -> usize { } impl Block { - pub(crate) fn new(start_index: usize) -> Block { - Block { + pub(crate) fn new(start_index: usize) -> Box> { + Box::new(Block { // The absolute index in the channel of the first slot in the block. start_index, @@ -85,8 +86,18 @@ impl Block { observed_tail_position: UnsafeCell::new(0), // Value storage - values: unsafe { Values::uninitialized() }, - } + values: unsafe { + // Allocate the values array on the heap. + let heap_alloc = + std::alloc::alloc(Layout::new::>()) as *mut MaybeUninit>; + + // Initialize the values array. + Values::initialize(&mut *heap_alloc); + + // Convert the pointer to a `Box`. + Box::from_raw(heap_alloc as *mut Values) + }, + }) } /// Returns `true` if the block matches the given index. @@ -291,7 +302,7 @@ impl Block { // Create the new block. It is assumed that the block will become the // next one after `&self`. If this turns out to not be the case, // `start_index` is updated accordingly. - let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + let new_block = Block::new(self.start_index + BLOCK_CAP); let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; @@ -360,19 +371,15 @@ fn is_tx_closed(bits: usize) -> bool { } impl Values { - unsafe fn uninitialized() -> Values { - let mut vals = MaybeUninit::uninit(); - + unsafe fn initialize(_value: &mut MaybeUninit>) { // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = vals.as_mut_ptr() as *mut UnsafeCell>; + let p = _value.as_mut_ptr() as *mut UnsafeCell>; for i in 0..BLOCK_CAP { p.add(i) .write(UnsafeCell::new(MaybeUninit::uninit())); } } - - Values(vals.assume_init()) } } @@ -383,3 +390,19 @@ impl ops::Index for Values { self.0.index(index) } } +#[cfg(test)] +#[test] +fn assert_no_stack_overflow() { + // https://github.com/tokio-rs/tokio/issues/5293 + + struct Foo { + _a: [u8; 2_000_000], + } + + assert_eq!( + Layout::new::>>(), + Layout::new::>() + ); + + let _block = Block::::new(0); +} diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index e4eeb454118..10b29575bdb 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -44,7 +44,7 @@ pub(crate) enum TryPopResult { pub(crate) fn channel() -> (Tx, Rx) { // Create the initial block shared between the tx and rx halves. - let initial_block = Box::new(Block::new(0)); + let initial_block = Block::new(0); let initial_block_ptr = Box::into_raw(initial_block); let tx = Tx { From ff9d23d87b422214f6e298809871896d4dd3bc7a Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 13 Dec 2022 10:42:14 -0800 Subject: [PATCH 2/3] Use addr_of macros --- tokio/src/sync/mpsc/block.rs | 121 ++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 45 deletions(-) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index ae97f05fb80..f0740ea6726 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -11,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; /// /// Each block in the list can hold up to `BLOCK_CAP` messages. pub(crate) struct Block { + /// The header fields. + header: BlockHeader, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values, +} + +/// Extra fields for a `Block`. +struct BlockHeader { /// The start index of this block. /// /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. @@ -25,11 +36,6 @@ pub(crate) struct Block { /// The observed `tail_position` value *after* the block has been passed by /// `block_tail`. observed_tail_position: UnsafeCell, - - /// Array containing values pushed into the block. Values are stored in a - /// continuous array in order to improve cache line behavior when reading. - /// The values must be manually dropped. - values: Box>, } pub(crate) enum Read { @@ -72,38 +78,53 @@ pub(crate) fn offset(slot_index: usize) -> usize { SLOT_MASK & slot_index } +generate_addr_of_methods! { + impl Block { + unsafe fn addr_of_header(self: NonNull) -> NonNull> { + &self.header + } + + unsafe fn addr_of_values(self: NonNull) -> NonNull> { + &self.values + } + } +} + impl Block { pub(crate) fn new(start_index: usize) -> Box> { - Box::new(Block { - // The absolute index in the channel of the first slot in the block. - start_index, + unsafe { + // Allocate the block on the heap. + let block = std::alloc::alloc(Layout::new::>()) as *mut Block; + let block = match NonNull::new(block) { + Some(block) => block, + None => std::alloc::handle_alloc_error(Layout::new::>()), + }; - // Pointer to the next block in the linked list. - next: AtomicPtr::new(ptr::null_mut()), + // Write the header to the block. + Block::addr_of_header(block).as_ptr().write(BlockHeader { + // The absolute index in the channel of the first slot in the block. + start_index, - ready_slots: AtomicUsize::new(0), + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), - observed_tail_position: UnsafeCell::new(0), + ready_slots: AtomicUsize::new(0), - // Value storage - values: unsafe { - // Allocate the values array on the heap. - let heap_alloc = - std::alloc::alloc(Layout::new::>()) as *mut MaybeUninit>; + observed_tail_position: UnsafeCell::new(0), + }); - // Initialize the values array. - Values::initialize(&mut *heap_alloc); + // Initialize the values array. + Values::initialize(Block::addr_of_values(block)); - // Convert the pointer to a `Box`. - Box::from_raw(heap_alloc as *mut Values) - }, - }) + // Convert the pointer to a `Box`. + Box::from_raw(block.as_ptr()) + } } /// Returns `true` if the block matches the given index. pub(crate) fn is_at_index(&self, index: usize) -> bool { debug_assert!(offset(index) == 0); - self.start_index == index + self.header.start_index == index } /// Returns the number of blocks between `self` and the block at the @@ -112,7 +133,7 @@ impl Block { /// `start_index` must represent a block *after* `self`. pub(crate) fn distance(&self, other_index: usize) -> usize { debug_assert!(offset(other_index) == 0); - other_index.wrapping_sub(self.start_index) / BLOCK_CAP + other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP } /// Reads the value at the given offset. @@ -127,7 +148,7 @@ impl Block { pub(crate) unsafe fn read(&self, slot_index: usize) -> Option> { let offset = offset(slot_index); - let ready_bits = self.ready_slots.load(Acquire); + let ready_bits = self.header.ready_slots.load(Acquire); if !is_ready(ready_bits, offset) { if is_tx_closed(ready_bits) { @@ -167,7 +188,7 @@ impl Block { /// Signal to the receiver that the sender half of the list is closed. pub(crate) unsafe fn tx_close(&self) { - self.ready_slots.fetch_or(TX_CLOSED, Release); + self.header.ready_slots.fetch_or(TX_CLOSED, Release); } /// Resets the block to a blank state. This enables reusing blocks in the @@ -180,9 +201,9 @@ impl Block { /// * All slots are empty. /// * The caller holds a unique pointer to the block. pub(crate) unsafe fn reclaim(&mut self) { - self.start_index = 0; - self.next = AtomicPtr::new(ptr::null_mut()); - self.ready_slots = AtomicUsize::new(0); + self.header.start_index = 0; + self.header.next = AtomicPtr::new(ptr::null_mut()); + self.header.ready_slots = AtomicUsize::new(0); } /// Releases the block to the rx half for freeing. @@ -198,19 +219,20 @@ impl Block { pub(crate) unsafe fn tx_release(&self, tail_position: usize) { // Track the observed tail_position. Any sender targeting a greater // tail_position is guaranteed to not access this block. - self.observed_tail_position + self.header + .observed_tail_position .with_mut(|ptr| *ptr = tail_position); // Set the released bit, signalling to the receiver that it is safe to // free the block's memory as soon as all slots **prior** to // `observed_tail_position` have been filled. - self.ready_slots.fetch_or(RELEASED, Release); + self.header.ready_slots.fetch_or(RELEASED, Release); } /// Mark a slot as ready fn set_ready(&self, slot: usize) { let mask = 1 << slot; - self.ready_slots.fetch_or(mask, Release); + self.header.ready_slots.fetch_or(mask, Release); } /// Returns `true` when all slots have their `ready` bits set. @@ -225,25 +247,31 @@ impl Block { /// single atomic cell. However, this could have negative impact on cache /// behavior as there would be many more mutations to a single slot. pub(crate) fn is_final(&self) -> bool { - self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK } /// Returns the `observed_tail_position` value, if set pub(crate) fn observed_tail_position(&self) -> Option { - if 0 == RELEASED & self.ready_slots.load(Acquire) { + if 0 == RELEASED & self.header.ready_slots.load(Acquire) { None } else { - Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + Some( + self.header + .observed_tail_position + .with(|ptr| unsafe { *ptr }), + ) } } /// Loads the next block pub(crate) fn load_next(&self, ordering: Ordering) -> Option>> { - let ret = NonNull::new(self.next.load(ordering)); + let ret = NonNull::new(self.header.next.load(ordering)); debug_assert!(unsafe { - ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) - .unwrap_or(true) + ret.map(|block| { + block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP) + }) + .unwrap_or(true) }); ret @@ -271,9 +299,10 @@ impl Block { success: Ordering, failure: Ordering, ) -> Result<(), NonNull>> { - block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self + .header .next .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) .unwrap_or_else(|x| x); @@ -302,7 +331,7 @@ impl Block { // Create the new block. It is assumed that the block will become the // next one after `&self`. If this turns out to not be the case, // `start_index` is updated accordingly. - let new_block = Block::new(self.start_index + BLOCK_CAP); + let new_block = Block::new(self.header.start_index + BLOCK_CAP); let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; @@ -319,7 +348,8 @@ impl Block { // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. let next = NonNull::new( - self.next + self.header + .next .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) .unwrap_or_else(|x| x), ); @@ -371,10 +401,10 @@ fn is_tx_closed(bits: usize) -> bool { } impl Values { - unsafe fn initialize(_value: &mut MaybeUninit>) { + unsafe fn initialize(_value: NonNull>) { // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = _value.as_mut_ptr() as *mut UnsafeCell>; + let p = _value.as_ptr() as *mut UnsafeCell>; for i in 0..BLOCK_CAP { p.add(i) .write(UnsafeCell::new(MaybeUninit::uninit())); @@ -390,7 +420,8 @@ impl ops::Index for Values { self.0.index(index) } } -#[cfg(test)] + +#[cfg(all(test, not(loom)))] #[test] fn assert_no_stack_overflow() { // https://github.com/tokio-rs/tokio/issues/5293 From 80ddbb6c1995a4f4cdbcbeb8e9be20556312dc7e Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 13 Dec 2022 18:21:15 -0800 Subject: [PATCH 3/3] Ameliorate some soundness concerns --- tokio/src/sync/mpsc/block.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index f0740ea6726..39c3e1be2d9 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -43,6 +43,7 @@ pub(crate) enum Read { Closed, } +#[repr(transparent)] struct Values([UnsafeCell>; BLOCK_CAP]); use super::BLOCK_CAP; @@ -94,6 +95,7 @@ impl Block { pub(crate) fn new(start_index: usize) -> Box> { unsafe { // Allocate the block on the heap. + // SAFETY: The size of the Block is non-zero, since it is at least the size of the header. let block = std::alloc::alloc(Layout::new::>()) as *mut Block; let block = match NonNull::new(block) { Some(block) => block, @@ -117,6 +119,8 @@ impl Block { Values::initialize(Block::addr_of_values(block)); // Convert the pointer to a `Box`. + // Safety: The raw pointer was allocated using the global allocator, and with + // the layout for a `Block`, so it's valid to convert it to box. Box::from_raw(block.as_ptr()) } } @@ -401,6 +405,11 @@ fn is_tx_closed(bits: usize) -> bool { } impl Values { + /// Initialize a `Values` struct from a pointer. + /// + /// # Safety + /// + /// The raw pointer must be valid for writing a `Values`. unsafe fn initialize(_value: NonNull>) { // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! {