Skip to content

Commit

Permalink
sync: fix racy UnsafeCell access on a closed oneshot (#4226)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Nov 14, 2021
1 parent 79d7a62 commit 4b78ed4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
37 changes: 32 additions & 5 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -1031,11 +1031,38 @@ impl State {
}

fn set_complete(cell: &AtomicUsize) -> State {
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
let val = cell.fetch_or(VALUE_SENT, AcqRel);
State(val)
// This method is a compare-and-swap loop rather than a fetch-or like
// other `set_$WHATEVER` methods on `State`. This is because we must
// check if the state has been closed before setting the `VALUE_SENT`
// bit.
//
// We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
// bit is already set, because `VALUE_SENT` will tell the receiver that
// it's okay to access the inner `UnsafeCell`. Immediately after calling
// `set_complete`, if the channel was closed, the sender will _also_
// access the `UnsafeCell` to take the value back out, so if a
// `poll_recv` or `try_recv` call is occurring concurrently, both
// threads may try to access the `UnsafeCell` if we were to set the
// `VALUE_SENT` bit on a closed channel.
let mut state = cell.load(Ordering::Relaxed);
loop {
if State(state).is_closed() {
break;
}
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
match cell.compare_exchange_weak(
state,
state | VALUE_SENT,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => state = actual,
}
}
State(state)
}

fn is_rx_task_set(self) -> bool {
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/sync/tests/loom_oneshot.rs
Expand Up @@ -55,6 +55,35 @@ fn changing_rx_task() {
});
}

#[test]
fn try_recv_close() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();
thread::spawn(move || {
let _ = tx.send(());
});

rx.close();
let _ = rx.try_recv();
})
}

#[test]
fn recv_closed() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();

thread::spawn(move || {
let _ = tx.send(1);
});

rx.close();
let _ = block_on(rx);
});
}

// TODO: Move this into `oneshot` proper.

use std::future::Future;
Expand Down

0 comments on commit 4b78ed4

Please sign in to comment.