Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: prepare Tokio v1.13.1 #4235

Merged
merged 3 commits into from Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.13.0", features = ["full"] }
tokio = { version = "1.13.1", features = ["full"] }
```
Then, on your main.rs:

Expand Down
9 changes: 9 additions & 0 deletions tokio/CHANGELOG.md
@@ -1,3 +1,12 @@
# 1.13.1 (November 15, 2021)

### Fixed

- sync: fix a data race between `oneshot::Sender::send` and awaiting a
`oneshot::Receiver` when the oneshot has been closed ([#4226])

[#4226]: https://github.com/tokio-rs/tokio/pull/4226

# 1.13.0 (October 29, 2021)

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions tokio/Cargo.toml
Expand Up @@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
version = "1.13.0"
version = "1.13.1"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
documentation = "https://docs.rs/tokio/1.13.0/tokio/"
documentation = "https://docs.rs/tokio/1.13.1/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
Expand Down
2 changes: 1 addition & 1 deletion tokio/README.md
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.13.0", features = ["full"] }
tokio = { version = "1.13.1", features = ["full"] }
```
Then, on your main.rs:

Expand Down
97 changes: 92 additions & 5 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -358,9 +358,19 @@ struct Inner<T> {
value: UnsafeCell<Option<T>>,

/// The task to notify when the receiver drops without consuming the value.
///
/// ## Safety
///
/// The `TX_TASK_SET` bit in the `state` field is set if this field is
/// initialized. If that bit is unset, this field may be uninitialized.
tx_task: Task,

/// The task to notify when the value is sent.
///
/// ## Safety
///
/// The `RX_TASK_SET` bit in the `state` field is set if this field is
/// initialized. If that bit is unset, this field may be uninitialized.
rx_task: Task,
}

Expand Down Expand Up @@ -490,11 +500,24 @@ impl<T> Sender<T> {
let inner = self.inner.take().unwrap();

inner.value.with_mut(|ptr| unsafe {
// SAFETY: The receiver will not access the `UnsafeCell` unless the
// channel has been marked as "complete" (the `VALUE_SENT` state bit
// is set).
// That bit is only set by the sender later on in this method, and
// calling this method consumes `self`. Therefore, if it was possible to
// call this method, we know that the `VALUE_SENT` bit is unset, and
// the receiver is not currently accessing the `UnsafeCell`.
*ptr = Some(t);
});

if !inner.complete() {
unsafe {
// SAFETY: The receiver will not access the `UnsafeCell` unless
// the channel has been marked as "complete". Calling
// `complete()` will return true if this bit is set, and false
// if it is not set. Thus, if `complete()` returned false, it is
// safe for us to access the value, because we know that the
// receiver will not.
return Err(inner.consume_value().unwrap());
}
}
Expand Down Expand Up @@ -840,6 +863,11 @@ impl<T> Receiver<T> {
let state = State::load(&inner.state, Acquire);

if state.is_complete() {
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
match unsafe { inner.consume_value() } {
Some(value) => Ok(value),
None => Err(TryRecvError::Closed),
Expand Down Expand Up @@ -930,6 +958,11 @@ impl<T> Inner<T> {
State::set_rx_task(&self.state);

coop.made_progress();
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
return match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),
Expand Down Expand Up @@ -976,6 +1009,14 @@ impl<T> Inner<T> {
}

/// Consumes the value. This function does not check `state`.
///
/// # Safety
///
/// Calling this method concurrently on multiple threads will result in a
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
/// sender *or* the receiver will call this method at a given point in time.
/// If `VALUE_SENT` is not set, then only the sender may call this method;
/// if it is set, then only the receiver may call this method.
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}
Expand Down Expand Up @@ -1016,9 +1057,28 @@ impl<T: fmt::Debug> fmt::Debug for Inner<T> {
}
}

/// Indicates that a waker for the receiving task has been set.
///
/// # Safety
///
/// If this bit is not set, the `rx_task` field may be uninitialized.
const RX_TASK_SET: usize = 0b00001;
/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
///
/// # Safety
///
/// This bit controls which side of the channel is permitted to access the
/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
/// the sender.
const VALUE_SENT: usize = 0b00010;
const CLOSED: usize = 0b00100;

/// Indicates that a waker for the sending task has been set.
///
/// # Safety
///
/// If this bit is not set, the `tx_task` field may be uninitialized.
const TX_TASK_SET: usize = 0b01000;

impl State {
Expand All @@ -1031,11 +1091,38 @@ impl State {
}

fn set_complete(cell: &AtomicUsize) -> State {
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
let val = cell.fetch_or(VALUE_SENT, AcqRel);
State(val)
// This method is a compare-and-swap loop rather than a fetch-or like
// other `set_$WHATEVER` methods on `State`. This is because we must
// check if the state has been closed before setting the `VALUE_SENT`
// bit.
//
// We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
// bit is already set, because `VALUE_SENT` will tell the receiver that
// it's okay to access the inner `UnsafeCell`. Immediately after calling
// `set_complete`, if the channel was closed, the sender will _also_
// access the `UnsafeCell` to take the value back out, so if a
// `poll_recv` or `try_recv` call is occurring concurrently, both
// threads may try to access the `UnsafeCell` if we were to set the
// `VALUE_SENT` bit on a closed channel.
let mut state = cell.load(Ordering::Relaxed);
loop {
if State(state).is_closed() {
break;
}
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
// the `RX_TASK_SET` flag is set. However, `loom` does not support
// fences yet.
match cell.compare_exchange_weak(
state,
state | VALUE_SENT,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => state = actual,
}
}
State(state)
}

fn is_rx_task_set(self) -> bool {
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/sync/tests/loom_oneshot.rs
Expand Up @@ -55,6 +55,35 @@ fn changing_rx_task() {
});
}

#[test]
fn try_recv_close() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();
thread::spawn(move || {
let _ = tx.send(());
});

rx.close();
let _ = rx.try_recv();
})
}

#[test]
fn recv_closed() {
// reproduces https://github.com/tokio-rs/tokio/issues/4225
loom::model(|| {
let (tx, mut rx) = oneshot::channel();

thread::spawn(move || {
let _ = tx.send(1);
});

rx.close();
let _ = block_on(rx);
});
}

// TODO: Move this into `oneshot` proper.

use std::future::Future;
Expand Down