Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: fix racy UnsafeCell access on a closed oneshot #4226

Merged
merged 8 commits into from Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 32 additions & 5 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -1031,11 +1031,38 @@ impl State {
}

fn set_complete(cell: &AtomicUsize) -> State {
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
let val = cell.fetch_or(VALUE_SENT, AcqRel);
State(val)
// This method is a compare-and-swap loop rather than a fetch-or like
// other `set_$WHATEVER` methods on `State`. This is because we must
// check if the state has been closed before setting the `VALUE_SENT`
// bit.
//
// We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
// bit is already set, because `VALUE_SENT` will tell the receiver that
// it's okay to access the inner `UnsafeCell`. Immediately after calling
// `set_complete`, if the channel was closed, the sender will _also_
// access the `UnsafeCell` to take the value back out, so if a
// `poll_recv` or `try_recv` call is occurring concurrently, both
// threads may try to access the `UnsafeCell` if we were to set the
// `VALUE_SENT` bit on a closed channel.
let mut state = cell.load(Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems safer.

Suggested change
let mut state = cell.load(Ordering::Relaxed);
let mut state = cell.load(Ordering::Acquire);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary; the compare_exchange has an Acquire failure ordering, so when we actually go to set the value, we will always perform an Acquire operation, and if the initial Relaxed load is stale, the CAS will fail, and we'll loop again with a value that was accessed with an Acquire ordering. So, making the initial load Relaxed really just serves to avoid an unnecessary Acquire in the case that we're the only thread writing to the state; if it's contended and the Relaxed load is stale, we will always perform an Acquire on subsequent iterations.

Other CAS loops elsewhere in Tokio follow a similar pattern:

// Quick initial debug check to see if the timer is already fired. Since
// firing the timer can only happen with the driver lock held, we know
// we shouldn't be able to "miss" a transition to a fired state, even
// with relaxed ordering.
let mut cur_state = self.state.load(Ordering::Relaxed);
loop {
debug_assert!(cur_state < STATE_MIN_VALUE);
if cur_state > not_after {
break Err(cur_state);
}
match self.state.compare_exchange(
cur_state,
STATE_PENDING_FIRE,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
break Ok(());
}
Err(actual_state) => {
cur_state = actual_state;
}
}

However, I'm happy to change this if you prefer.

loop {
if State(state).is_closed() {
break;
}
Comment on lines +1048 to +1051
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also considered writing this as

Suggested change
loop {
if State(state).is_closed() {
break;
}
while !State(state).is_closed() {

but i had a vague memory of @carllerche saying that he doesn't like the use of while loops for compare-and-swap loops... :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way it's written here with a loop is perfectly readable, so let's just keep it.

// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
Copy link
Contributor

@cynecx cynecx Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think loom has partial support for fences iiuc (tokio-rs/loom#220), so technically this comment isn't correct anymore :D.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think there might be a couple other places in Tokio where we also can implement some things nicer now that loom supports fences. I didn't want to actually change that aspect of the implementation in the bugfix branch, though. Might be worth doing in a subsequent one!

match cell.compare_exchange_weak(
state,
state | VALUE_SENT,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => state = actual,
}
}
State(state)
}

fn is_rx_task_set(self) -> bool {
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/sync/tests/loom_oneshot.rs
Expand Up @@ -55,6 +55,35 @@ fn changing_rx_task() {
});
}

#[test]
fn try_recv_close() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();
thread::spawn(move || {
let _ = tx.send(());
});

rx.close();
let _ = rx.try_recv();
})
}

#[test]
fn recv_closed() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();

thread::spawn(move || {
let _ = tx.send(1);
});

rx.close();
let _ = block_on(rx);
});
}

// TODO: Move this into `oneshot` proper.

use std::future::Future;
Expand Down