Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio02-style mpsc sender #3105

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ jobs:
rm -rf tokio/tests

- name: miri
run: cargo miri test --features rt,rt-multi-thread,sync task
run: |
cargo miri test --features rt,rt-multi-thread,sync task
cargo miri test -p tokio-util 'sync::tests::mpsc'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, but we should have a miri test that calls is_ready and clone while the state is in either Ready or Acquire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it seems that there is no way to poll futures under MIRI.
Of course, I plan to write some less trivial tests but there is no way to verify them in MIRI in the near future.

working-directory: tokio
san:
name: san
Expand Down
6 changes: 6 additions & 0 deletions tokio-util/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Synchronization primitives

mod mpsc;
pub use mpsc::Sender;

mod cancellation_token;
pub use cancellation_token::{CancellationToken, WaitForCancellationFuture};

mod intrusive_double_linked_list;

#[cfg(test)]
mod tests;
160 changes: 160 additions & 0 deletions tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
//! Tokio-02 style MPSC channel.
use std::{
future::Future,
marker::PhantomPinned,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::{error::SendError, Permit};

/// Fat sender with `poll_ready` support.
#[derive(Debug)]
// TODO: it doesn't seem we really need 'static here...
pub struct Sender<T: 'static> {
// field order: `state` can contain reference to `inner`,
// so it must be dropped first.
/// State of this sender
state: State<T>,
/// Underlying channel
inner: tokio::sync::mpsc::Sender<T>,
/// Pinned marker
pinned: PhantomPinned,
}

impl<T: 'static> Sender<T> {
/// It is OK to get mutable reference to state, because it is not
/// self-referencing.
fn pin_project_state(self: Pin<&mut Self>) -> &mut State<T> {
unsafe { &mut Pin::into_inner_unchecked(self).state }
}

/// Sender must be pinned because state can contain references to it
fn pin_project_inner(self: Pin<&mut Self>) -> Pin<&mut tokio::sync::mpsc::Sender<T>> {
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
unsafe { self.map_unchecked_mut(|t| &mut t.inner) }
}
}

impl<T: 'static> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
state: State::Empty,
inner: self.inner.clone(),
pinned: PhantomPinned,
}
}
}

type AcquireFutOutput<T> = Result<Permit<'static, T>, SendError<()>>;

enum State<T: 'static> {
/// We do not have permit, but we didn't start acquiring it.
Empty,
/// We have a permit to send.
// ALERT: this is self-reference to the Sender.
Ready(Permit<'static, T>),
/// We are in process of acquiring a permit
// ALERT: contained future contains self-reference to the sender.
Acquire(Pin<Box<dyn Future<Output = AcquireFutOutput<T>>>>),
}

impl<T: 'static> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::Empty => f.debug_tuple("Empty").finish(),
State::Ready(_) => f.debug_tuple("Ready").field(&"_").finish(),
State::Acquire(_) => f.debug_tuple("Acquire").field(&"_").finish(),
}
}
}

impl<T: 'static> Sender<T> {
/// Wraps a [tokio sender](tokio::sync::mpsc::Sender).
pub fn new(inner: tokio::sync::mpsc::Sender<T>) -> Self {
Sender {
inner,
state: State::Empty,
pinned: PhantomPinned,
}
}

/// Returns sender readiness state
pub fn is_ready(&self) -> bool {
matches!(self.state, State::Ready(_))
}

/// Sends a message.
///
/// This method panics if the `Sender` is not ready.
pub fn send(self: Pin<&mut Self>, value: T) {
let permit = match std::mem::replace(self.pin_project_state(), State::Empty) {
State::Ready(permit) => permit,
_ => panic!("called send() on non-ready Sender"),
};
permit.send(value);
}

/// Disarm permit. This releases the reserved slot in the bounded channel.
///
/// This function can only be called when the `Sender` is ready.
pub fn disarm(mut self: Pin<&mut Self>) {
assert!(matches!(self.as_mut().pin_project_state(), State::Ready(_)));
*self.pin_project_state() = State::Empty;
}

/// Tries to acquire a permit.
///
/// This function can not be called when the `Sender` is ready.
pub fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError<()>>> {
let mut fut = match std::mem::replace(self.as_mut().pin_project_state(), State::Empty) {
State::Ready(_) => panic!("poll_ready() must not be called on ready sender"),
State::Acquire(f) => f,
State::Empty => {
// Extend lifetime here.
// Future will not outlive sender, neither does permit.
// `Pin::into_inner_unchecked` is OK because it does not move sender.
let long_lived_sender = unsafe {
std::mem::transmute::<
&mut tokio::sync::mpsc::Sender<T>,
&'static mut tokio::sync::mpsc::Sender<T>,
>(Pin::into_inner_unchecked(
self.as_mut().pin_project_inner(),
))
};
Box::pin(long_lived_sender.reserve())
}
};
let poll = fut.as_mut().poll(cx);
match poll {
Poll::Pending => {
*self.pin_project_state() = State::Acquire(fut);
Poll::Pending
}
Poll::Ready(Ok(permit)) => {
*self.pin_project_state() = State::Ready(permit);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => {
// leave state in `Empty` state
Poll::Ready(Err(err))
}
}
}
}

#[cfg(test)]
fn _verify_not_unpin(x: Sender<String>) {
trait Foo {
fn is_ready(&self) -> bool;
}

impl<T: Unpin> Foo for T {
fn is_ready(&self) -> bool {
false
}
}

assert!(x.is_ready());
}
2 changes: 1 addition & 1 deletion tokio-util/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@

mod mpsc;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that cancellation token tests are just ignored currently.

16 changes: 16 additions & 0 deletions tokio-util/src/sync/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// TODO: loom
use crate::sync::Sender;
use futures::future::poll_fn;
use tokio::sync::mpsc::channel;

#[test]
fn basic() {
futures::executor::block_on(async move {
let (tx, mut rx) = channel::<String>(1);
let tx = Sender::new(tx);
tokio::pin!(tx);
poll_fn(|cx| tx.as_mut().poll_ready(cx)).await.unwrap();
tx.send("hello".to_string());
assert_eq!(rx.try_recv(), Ok("hello".to_string()))
});
}