Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio02-style mpsc sender #3105

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions tokio-util/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Synchronization primitives

mod mpsc;
pub use mpsc::{Sender};

mod cancellation_token;
pub use cancellation_token::{CancellationToken, WaitForCancellationFuture};

mod intrusive_double_linked_list;

#[cfg(test)]
mod tests;
133 changes: 133 additions & 0 deletions tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Tokio-02 style MPSC channel.
use std::{
future::Future,
marker::PhantomPinned,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::{error::SendError, Permit};

/// Fat sender with `poll_ready` support.
#[derive(Debug)]
// TODO: it doesn't seem we really need 'static here...
pub struct Sender<T: 'static> {
// field order: `state` can contain reference to `inner`,
// so it must be dropped first.
/// State of this sender
state: State<T>,
/// Underlying channel
inner: tokio::sync::mpsc::Sender<T>,
/// Pinned marker
pinned: PhantomPinned,
}

impl<T: 'static> Sender<T> {
/// It is OK to get mutable reference to state, because it is not
/// self-referencing.
fn pin_project_state(self: Pin<&mut Self>) -> &mut State<T> {
unsafe { &mut Pin::into_inner_unchecked(self).state }
}

/// Sender must be pinned because state can contain references to it
fn pin_project_inner(self: Pin<&mut Self>) -> Pin<&mut tokio::sync::mpsc::Sender<T>> {
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
unsafe { self.map_unchecked_mut(|t| &mut t.inner) }
}
}

enum State<T: 'static> {
/// We do not have permit, but we didn't start acquiring it.
Empty,
/// We have a permit to send.
// ALERT: this is self-reference to the Sender.
Ready(Permit<'static, T>),
/// We are in process of acquiring a permit
// ALERT: contained future contains self-reference to the sender.
Acquire(Pin<Box<dyn Future<Output = Result<Permit<'static, T>, SendError<()>>>>>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boxing can be avoided when type_alias_impl_trait finally lands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, if this was implemented in tokio rather than in tokio-util, this could just use the internal semaphore implementation's Acquire future. But, I don't think that's going to ever become public API, so this would need to be in tokio proper.

}

impl<T: 'static> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::Empty => f.debug_tuple("Empty").finish(),
State::Ready(_) => f.debug_tuple("Ready").field(&"_").finish(),
State::Acquire(_) => f.debug_tuple("Acquire").field(&"_").finish(),
}
}
}

impl<T: 'static> Sender<T> {
/// Wraps a [tokio sender](tokio::sync::mpsc::Sender).
pub fn new(inner: tokio::sync::mpsc::Sender<T>) -> Self {
Sender {
inner,
state: State::Empty,
pinned: PhantomPinned,
}
}

/// Returns sender readiness state
pub fn is_ready(&self) -> bool {
matches!(self.state, State::Ready(_))
}

/// Sends a message.
/// This function can only be called when `is_ready() == false`,
/// otherwise it panics.
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
pub fn send(self: Pin<&mut Self>, value: T) {
let permit = match std::mem::replace(self.pin_project_state(), State::Empty) {
State::Ready(permit) => permit,
_ => panic!("called send() on non-ready Sender"),
};
permit.send(value);
}

/// Disarms permit, allowing other senders to use freed capacity slot.
/// This function can only be called when `is_ready() == true`,
/// otherwise it panics.
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
pub fn disarm(mut self: Pin<&mut Self>) {
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
assert!(matches!(self.as_mut().pin_project_state(), State::Ready(_)));
*self.pin_project_state() = State::Empty;
}

/// Tries to acquire a permit.
/// This function can only be called when `is_ready() == true`,
/// otherwise it panics.
MikailBag marked this conversation as resolved.
Show resolved Hide resolved
pub fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError<()>>> {
let mut fut = match std::mem::replace(self.as_mut().pin_project_state(), State::Empty) {
State::Ready(_) => panic!("poll_ready() must not be called on ready sender"),
State::Acquire(f) => f,
State::Empty => {
// Extend lifetime here.
// Future will not outlive sender, neither does permit.
// `Pin::into_inner_unchecked` is OK because it does not move sender.
let long_lived_sender = unsafe {
std::mem::transmute::<
&mut tokio::sync::mpsc::Sender<T>,
&'static mut tokio::sync::mpsc::Sender<T>,
>(Pin::into_inner_unchecked(
self.as_mut().pin_project_inner(),
))
};
Box::pin(long_lived_sender.reserve())
}
};
let poll = fut.as_mut().poll(cx);
match poll {
Poll::Pending => {
*self.pin_project_state() = State::Acquire(fut);
Poll::Pending
}
Poll::Ready(Ok(permit)) => {
*self.pin_project_state() = State::Ready(permit);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => {
// leave state in `Empty` state
Poll::Ready(Err(err))
}
}
}
}
2 changes: 1 addition & 1 deletion tokio-util/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@

mod mpsc;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that cancellation token tests are just ignored currently.

14 changes: 14 additions & 0 deletions tokio-util/src/sync/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// TODO: loom
use crate::sync::Sender;
use futures::future::poll_fn;
use tokio::sync::mpsc::channel;

#[tokio::test]
async fn basic() {
let (tx, mut rx) = channel::<String>(1);
let tx = Sender::new(tx);
tokio::pin!(tx);
poll_fn(|cx| tx.as_mut().poll_ready(cx)).await.unwrap();
tx.send("hello".to_string());
assert_eq!(rx.try_recv(), Ok("hello".to_string()))
}