Skip to content

Commit

Permalink
oneshot: document UnsafeCell invariants (#4229)
Browse files Browse the repository at this point in the history
Depends on #4226

## Motivation

Currently, the safety invariants and synchronization strategy used in
`tokio::sync::oneshot` are not particularly obvious, especially to a new
reader. It would be nice to better document this code to make these
invariants clearer.

## Solution

This branch adds `SAFETY:` comments to the `oneshot` channel
implementation. In particular, I've focused on documenting the
invariants around when the inner `UnsafeCell` that stores the value can
be accessed by the sender and receiver sides of the channel.

I still want to take a closer look at when the waker cells can be set,
and I'd like to add more documentation there in a follow-up branch.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Nov 15, 2021
1 parent 4b78ed4 commit 26f0938
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -358,9 +358,19 @@ struct Inner<T> {
value: UnsafeCell<Option<T>>,

/// The task to notify when the receiver drops without consuming the value.
///
/// ## Safety
///
/// The `TX_TASK_SET` bit in the `state` field is set if this field is
/// initialized. If that bit is unset, this field may be uninitialized.
tx_task: Task,

/// The task to notify when the value is sent.
///
/// ## Safety
///
/// The `RX_TASK_SET` bit in the `state` field is set if this field is
/// initialized. If that bit is unset, this field may be uninitialized.
rx_task: Task,
}

Expand Down Expand Up @@ -490,11 +500,24 @@ impl<T> Sender<T> {
let inner = self.inner.take().unwrap();

inner.value.with_mut(|ptr| unsafe {
// SAFETY: The receiver will not access the `UnsafeCell` unless the
// channel has been marked as "complete" (the `VALUE_SENT` state bit
// is set).
// That bit is only set by the sender later on in this method, and
// calling this method consumes `self`. Therefore, if it was possible to
// call this method, we know that the `VALUE_SENT` bit is unset, and
// the receiver is not currently accessing the `UnsafeCell`.
*ptr = Some(t);
});

if !inner.complete() {
unsafe {
// SAFETY: The receiver will not access the `UnsafeCell` unless
// the channel has been marked as "complete". Calling
// `complete()` will return true if this bit is set, and false
// if it is not set. Thus, if `complete()` returned false, it is
// safe for us to access the value, because we know that the
// receiver will not.
return Err(inner.consume_value().unwrap());
}
}
Expand Down Expand Up @@ -840,6 +863,11 @@ impl<T> Receiver<T> {
let state = State::load(&inner.state, Acquire);

if state.is_complete() {
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
match unsafe { inner.consume_value() } {
Some(value) => Ok(value),
None => Err(TryRecvError::Closed),
Expand Down Expand Up @@ -930,6 +958,11 @@ impl<T> Inner<T> {
State::set_rx_task(&self.state);

coop.made_progress();
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
return match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),
Expand Down Expand Up @@ -976,6 +1009,14 @@ impl<T> Inner<T> {
}

/// Consumes the value. This function does not check `state`.
///
/// # Safety
///
/// Calling this method concurrently on multiple threads will result in a
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
/// sender *or* the receiver will call this method at a given point in time.
/// If `VALUE_SENT` is not set, then only the sender may call this method;
/// if it is set, then only the receiver may call this method.
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}
Expand Down Expand Up @@ -1016,9 +1057,28 @@ impl<T: fmt::Debug> fmt::Debug for Inner<T> {
}
}

/// Indicates that a waker for the receiving task has been set.
///
/// # Safety
///
/// If this bit is not set, the `rx_task` field may be uninitialized.
const RX_TASK_SET: usize = 0b00001;
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
///
/// # Safety
///
/// This bit controls which side of the channel is permitted to access the
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
/// the sender.
const VALUE_SENT: usize = 0b00010;
const CLOSED: usize = 0b00100;

/// Indicates that a waker for the sending task has been set.
///
/// # Safety
///
/// If this bit is not set, the `tx_task` field may be uninitialized.
const TX_TASK_SET: usize = 0b01000;

impl State {
Expand Down

0 comments on commit 26f0938

Please sign in to comment.