Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsoundness issues (mem::zeroed / ManuallyDrop -> MaybeUninit) #458

Merged
merged 16 commits into from Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions crossbeam-channel/Cargo.toml
Expand Up @@ -15,6 +15,9 @@ description = "Multi-producer multi-consumer channels for message passing"
keywords = ["channel", "mpmc", "select", "golang", "message"]
categories = ["algorithms", "concurrency", "data-structures"]

[dependencies]
maybe-uninit = "2.0.0"

[dependencies.crossbeam-utils]
version = "0.7"
path = "../crossbeam-utils"
Expand Down
15 changes: 11 additions & 4 deletions crossbeam-channel/src/flavors/array.rs
Expand Up @@ -22,6 +22,8 @@ use std::time::Instant;

use crossbeam_utils::{Backoff, CachePadded};

use maybe_uninit::MaybeUninit;

use context::Context;
use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use select::{Operation, SelectHandle, Selected, Token};
Expand All @@ -33,7 +35,7 @@ struct Slot<T> {
stamp: AtomicUsize,

/// The message in this slot.
msg: UnsafeCell<T>,
msg: UnsafeCell<MaybeUninit<T>>,
}

/// The token type for the array flavor.
Expand Down Expand Up @@ -233,7 +235,7 @@ impl<T> Channel<T> {
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);

// Write the message into the slot and update the stamp.
slot.msg.get().write(msg);
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);

// Wake a sleeping receiver.
Expand Down Expand Up @@ -323,7 +325,7 @@ impl<T> Channel<T> {
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);

// Read the message from the slot and update the stamp.
let msg = slot.msg.get().read();
let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);

// Wake a sleeping sender.
Expand Down Expand Up @@ -542,7 +544,12 @@ impl<T> Drop for Channel<T> {
};

unsafe {
self.buffer.add(index).drop_in_place();
let p = {
let slot = &mut *self.buffer.add(index);
let msg = &mut *slot.msg.get();
msg.as_mut_ptr()
};
p.drop_in_place();
}
}

Expand Down
21 changes: 14 additions & 7 deletions crossbeam-channel/src/flavors/list.rs
Expand Up @@ -2,13 +2,14 @@

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::ptr;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::time::Instant;

use crossbeam_utils::{Backoff, CachePadded};

use maybe_uninit::MaybeUninit;

use context::Context;
use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use select::{Operation, SelectHandle, Selected, Token};
Expand Down Expand Up @@ -42,7 +43,7 @@ const MARK_BIT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The message.
msg: UnsafeCell<ManuallyDrop<T>>,
msg: UnsafeCell<MaybeUninit<T>>,

/// The state of the slot.
state: AtomicUsize,
Expand Down Expand Up @@ -72,7 +73,13 @@ struct Block<T> {
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
unsafe { mem::zeroed() }
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
unsafe { MaybeUninit::zeroed().assume_init() }
}

/// Waits until the next pointer is set.
Expand Down Expand Up @@ -280,7 +287,7 @@ impl<T> Channel<T> {
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(ManuallyDrop::new(msg));
slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);

// Wake a sleeping receiver.
Expand Down Expand Up @@ -385,8 +392,7 @@ impl<T> Channel<T> {
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let m = slot.msg.get().read();
let msg = ManuallyDrop::into_inner(m);
let msg = slot.msg.get().read().assume_init();

// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
Expand Down Expand Up @@ -572,7 +578,8 @@ impl<T> Drop for Channel<T> {
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
ManuallyDrop::drop(&mut *(*slot).msg.get());
let p = &mut *slot.msg.get();
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
Expand Down
1 change: 1 addition & 0 deletions crossbeam-channel/src/lib.rs
Expand Up @@ -348,6 +348,7 @@
#![warn(missing_debug_implementations)]

extern crate crossbeam_utils;
extern crate maybe_uninit;

mod channel;
mod context;
Expand Down
3 changes: 3 additions & 0 deletions crossbeam-deque/Cargo.toml
Expand Up @@ -15,6 +15,9 @@ description = "Concurrent work-stealing deque"
keywords = ["chase-lev", "lock-free", "scheduler", "scheduling"]
categories = ["algorithms", "concurrency", "data-structures"]

[dependencies]
maybe-uninit = "2.0.0"

[dependencies.crossbeam-epoch]
version = "0.8"
path = "../crossbeam-epoch"
Expand Down
41 changes: 23 additions & 18 deletions crossbeam-deque/src/lib.rs
Expand Up @@ -92,19 +92,23 @@
extern crate crossbeam_epoch as epoch;
extern crate crossbeam_utils as utils;

extern crate maybe_uninit;

use std::cell::{Cell, UnsafeCell};
use std::cmp;
use std::fmt;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::mem;
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;

use epoch::{Atomic, Owned};
use utils::{Backoff, CachePadded};

use maybe_uninit::MaybeUninit;

// Minimum buffer capacity.
const MIN_CAP: usize = 64;
// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
Expand Down Expand Up @@ -218,7 +222,7 @@ impl<T> Drop for Inner<T> {
// Go through the buffer from front to back and drop all tasks in the queue.
let mut i = f;
while i != b {
ptr::drop_in_place(buffer.deref().at(i));
buffer.deref().at(i).drop_in_place();
i = i.wrapping_add(1);
}

Expand Down Expand Up @@ -1140,7 +1144,7 @@ const HAS_NEXT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The task.
task: UnsafeCell<ManuallyDrop<T>>,
task: UnsafeCell<MaybeUninit<T>>,

/// The state of the slot.
state: AtomicUsize,
Expand Down Expand Up @@ -1170,7 +1174,13 @@ struct Block<T> {
impl<T> Block<T> {
/// Creates an empty block that starts at `start_index`.
fn new() -> Block<T> {
unsafe { mem::zeroed() }
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
unsafe { MaybeUninit::zeroed().assume_init() }
}

/// Waits until the next pointer is set.
Expand Down Expand Up @@ -1329,7 +1339,7 @@ impl<T> Injector<T> {

// Write the task into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.task.get().write(ManuallyDrop::new(task));
slot.task.get().write(MaybeUninit::new(task));
slot.state.fetch_or(WRITE, Ordering::Release);

return;
Expand Down Expand Up @@ -1422,8 +1432,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

// Destroy the block if we've reached the end, or if another thread wanted to destroy
// but couldn't because we were busy reading from the slot.
Expand Down Expand Up @@ -1548,8 +1557,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1561,8 +1569,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1704,8 +1711,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

match dest.flavor {
Flavor::Fifo => {
Expand All @@ -1714,8 +1720,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1728,8 +1733,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let m = slot.task.get().read();
let task = ManuallyDrop::into_inner(m);
let task = slot.task.get().read().assume_init();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1804,7 +1808,8 @@ impl<T> Drop for Injector<T> {
if offset < BLOCK_CAP {
// Drop the task in the slot.
let slot = (*block).slots.get_unchecked(offset);
ManuallyDrop::drop(&mut *(*slot).task.get());
let p = &mut *slot.task.get();
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
Expand Down
1 change: 1 addition & 0 deletions crossbeam-epoch/Cargo.toml
Expand Up @@ -24,6 +24,7 @@ sanitize = [] # Makes it more likely to trigger any potential data races.

[dependencies]
cfg-if = "0.1.2"
maybe-uninit = "2.0.0"
memoffset = "0.5"

[dependencies.crossbeam-utils]
Expand Down
20 changes: 8 additions & 12 deletions crossbeam-epoch/src/deferred.rs
Expand Up @@ -4,6 +4,8 @@ use core::marker::PhantomData;
use core::mem;
use core::ptr;

use maybe_uninit::MaybeUninit;

/// Number of words a piece of `Data` can hold.
///
/// Three words should be enough for the majority of cases. For example, you can fit inside it the
Expand Down Expand Up @@ -36,11 +38,8 @@ impl Deferred {

unsafe {
if size <= mem::size_of::<Data>() && align <= mem::align_of::<Data>() {
// TODO(taiki-e): when the minimum supported Rust version is bumped to 1.36+,
// replace this with `mem::MaybeUninit`.
#[allow(deprecated)]
let mut data: Data = mem::uninitialized();
ptr::write(&mut data as *mut Data as *mut F, f);
let mut data = MaybeUninit::<Data>::uninit();
ptr::write(data.as_mut_ptr() as *mut F, f);

unsafe fn call<F: FnOnce()>(raw: *mut u8) {
let f: F = ptr::read(raw as *mut F);
Expand All @@ -49,16 +48,13 @@ impl Deferred {

Deferred {
call: call::<F>,
data,
data: data.assume_init(),
_marker: PhantomData,
}
} else {
let b: Box<F> = Box::new(f);
// TODO(taiki-e): when the minimum supported Rust version is bumped to 1.36+,
// replace this with `mem::MaybeUninit`.
#[allow(deprecated)]
let mut data: Data = mem::uninitialized();
ptr::write(&mut data as *mut Data as *mut Box<F>, b);
let mut data = MaybeUninit::<Data>::uninit();
ptr::write(data.as_mut_ptr() as *mut Box<F>, b);

unsafe fn call<F: FnOnce()>(raw: *mut u8) {
let b: Box<F> = ptr::read(raw as *mut Box<F>);
Expand All @@ -67,7 +63,7 @@ impl Deferred {

Deferred {
call: call::<F>,
data,
data: data.assume_init(),
_marker: PhantomData,
}
}
Expand Down
2 changes: 2 additions & 0 deletions crossbeam-epoch/src/lib.rs
Expand Up @@ -64,6 +64,8 @@ extern crate cfg_if;
#[cfg(feature = "std")]
extern crate core;

extern crate maybe_uninit;

cfg_if! {
if #[cfg(feature = "alloc")] {
extern crate alloc;
Expand Down