Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Heap allocate the values array #5294

Merged
merged 3 commits into from Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 35 additions & 12 deletions tokio/src/sync/mpsc/block.rs
@@ -1,6 +1,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};

use std::alloc::Layout;
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
Expand Down Expand Up @@ -28,7 +29,7 @@ pub(crate) struct Block<T> {
/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
values: Box<Values<T>>,
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) enum Read<T> {
Expand Down Expand Up @@ -72,8 +73,8 @@ pub(crate) fn offset(slot_index: usize) -> usize {
}

impl<T> Block<T> {
pub(crate) fn new(start_index: usize) -> Block<T> {
Block {
pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
Box::new(Block {
// The absolute index in the channel of the first slot in the block.
start_index,

Expand All @@ -85,8 +86,18 @@ impl<T> Block<T> {
observed_tail_position: UnsafeCell::new(0),

// Value storage
values: unsafe { Values::uninitialized() },
}
values: unsafe {
// Allocate the values array on the heap.
let heap_alloc =
std::alloc::alloc(Layout::new::<Values<T>>()) as *mut MaybeUninit<Values<T>>;

// Initialize the values array.
Values::initialize(&mut *heap_alloc);

// Convert the pointer to a `Box`.
Box::from_raw(heap_alloc as *mut Values<T>)
},
})
}

/// Returns `true` if the block matches the given index.
Expand Down Expand Up @@ -291,7 +302,7 @@ impl<T> Block<T> {
// Create the new block. It is assumed that the block will become the
// next one after `&self`. If this turns out to not be the case,
// `start_index` is updated accordingly.
let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
let new_block = Block::new(self.start_index + BLOCK_CAP);

let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };

Expand Down Expand Up @@ -360,19 +371,15 @@ fn is_tx_closed(bits: usize) -> bool {
}

impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();

unsafe fn initialize(_value: &mut MaybeUninit<Values<T>>) {
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
let p = _value.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}

Values(vals.assume_init())
}
}

Expand All @@ -383,3 +390,19 @@ impl<T> ops::Index<usize> for Values<T> {
self.0.index(index)
}
}
#[cfg(test)]
#[test]
fn assert_no_stack_overflow() {
// https://github.com/tokio-rs/tokio/issues/5293

struct Foo {
_a: [u8; 2_000_000],
}

assert_eq!(
Layout::new::<MaybeUninit<Block<Foo>>>(),
Layout::new::<Block<Foo>>()
);

let _block = Block::<Foo>::new(0);
}
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/list.rs
Expand Up @@ -44,7 +44,7 @@ pub(crate) enum TryPopResult<T> {

pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
// Create the initial block shared between the tx and rx halves.
let initial_block = Box::new(Block::new(0));
let initial_block = Block::new(0);
let initial_block_ptr = Box::into_raw(initial_block);

let tx = Tx {
Expand Down