-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tokio02-style mpsc sender #3105
Closed
Closed
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
e29745b
Add tokio02-style mpsc sender
MikailBag 87ab07e
fix clippy complex type lint
MikailBag 08fac61
Add to miri
MikailBag 1bf5598
add clone impl
MikailBag f3b06ae
fix miri
MikailBag 19df42a
fix ci again
MikailBag 6c85b54
explicitly set pkg name
MikailBag 528ace5
do not use tokio rt
MikailBag ac896ab
Apply suggestions from code review
MikailBag deff81e
Test that sender is !Unpin
MikailBag cbbfaf0
no need to invent block_on
MikailBag 1757368
rm useless pin
MikailBag 8077de6
disarm: fn -> bool
MikailBag 6c976f6
fix send sync
MikailBag ffaa112
Add one more test
MikailBag eae5db6
miri is async hater
MikailBag b4359ee
fix features
MikailBag 8ca1ea6
Feature-gate tests
MikailBag 6e8d488
Merge branch 'master' into fat-mpsc
MikailBag 20394f6
no need to be static
MikailBag 613024d
One more test
MikailBag f3a2975
Unsafety++, generics++
MikailBag 9a7c0b4
respect minrust
MikailBag d89a4ea
clippy lint
MikailBag d3088c1
another lint
MikailBag cc72b31
no need to transmute
MikailBag 0eb2c49
Drop output too
MikailBag 3f59124
rollback CI change
MikailBag 718a844
rm unneeded
MikailBag 49c8b60
rm unneeded lifetime
MikailBag File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,12 @@ | ||
//! Synchronization primitives | ||
|
||
mod mpsc; | ||
pub use mpsc::Sender; | ||
|
||
mod cancellation_token; | ||
pub use cancellation_token::{CancellationToken, WaitForCancellationFuture}; | ||
|
||
mod intrusive_double_linked_list; | ||
|
||
#[cfg(test)] | ||
mod tests; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
//! Tokio-02 style MPSC channel. | ||
use std::{ | ||
future::Future, | ||
marker::PhantomPinned, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
use tokio::sync::mpsc::{error::SendError, Permit}; | ||
|
||
/// Fat sender with `poll_ready` support. | ||
#[derive(Debug)] | ||
// TODO: it doesn't seem we really need 'static here... | ||
pub struct Sender<T: 'static> { | ||
// field order: `state` can contain reference to `inner`, | ||
// so it must be dropped first. | ||
/// State of this sender | ||
state: State<T>, | ||
/// Underlying channel | ||
inner: tokio::sync::mpsc::Sender<T>, | ||
/// Pinned marker | ||
pinned: PhantomPinned, | ||
} | ||
|
||
impl<T: 'static> Sender<T> { | ||
/// It is OK to get mutable reference to state, because it is not | ||
/// self-referencing. | ||
fn pin_project_state(self: Pin<&mut Self>) -> &mut State<T> { | ||
unsafe { &mut Pin::into_inner_unchecked(self).state } | ||
} | ||
|
||
/// Sender must be pinned because state can contain references to it | ||
fn pin_project_inner(self: Pin<&mut Self>) -> Pin<&mut tokio::sync::mpsc::Sender<T>> { | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
unsafe { self.map_unchecked_mut(|t| &mut t.inner) } | ||
} | ||
} | ||
|
||
impl<T: 'static> Clone for Sender<T> { | ||
fn clone(&self) -> Self { | ||
Sender { | ||
state: State::Empty, | ||
inner: self.inner.clone(), | ||
pinned: PhantomPinned, | ||
} | ||
} | ||
} | ||
|
||
type AcquireFutOutput<T> = Result<Permit<'static, T>, SendError<()>>; | ||
|
||
enum State<T: 'static> { | ||
/// We do not have permit, but we didn't start acquiring it. | ||
Empty, | ||
/// We have a permit to send. | ||
// ALERT: this is self-reference to the Sender. | ||
Ready(Permit<'static, T>), | ||
/// We are in process of acquiring a permit | ||
// ALERT: contained future contains self-reference to the sender. | ||
Acquire(Pin<Box<dyn Future<Output = AcquireFutOutput<T>>>>), | ||
} | ||
|
||
impl<T: 'static> std::fmt::Debug for State<T> { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
match self { | ||
State::Empty => f.debug_tuple("Empty").finish(), | ||
State::Ready(_) => f.debug_tuple("Ready").field(&"_").finish(), | ||
State::Acquire(_) => f.debug_tuple("Acquire").field(&"_").finish(), | ||
} | ||
} | ||
} | ||
|
||
impl<T: 'static> Sender<T> { | ||
/// Wraps a [tokio sender](tokio::sync::mpsc::Sender). | ||
pub fn new(inner: tokio::sync::mpsc::Sender<T>) -> Self { | ||
Sender { | ||
inner, | ||
state: State::Empty, | ||
pinned: PhantomPinned, | ||
} | ||
} | ||
|
||
/// Returns sender readiness state | ||
pub fn is_ready(&self) -> bool { | ||
matches!(self.state, State::Ready(_)) | ||
} | ||
|
||
/// Sends a message. | ||
/// This function can only be called when `is_ready() == false`, | ||
/// otherwise it panics. | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn send(self: Pin<&mut Self>, value: T) { | ||
let permit = match std::mem::replace(self.pin_project_state(), State::Empty) { | ||
State::Ready(permit) => permit, | ||
_ => panic!("called send() on non-ready Sender"), | ||
}; | ||
permit.send(value); | ||
} | ||
|
||
/// Disarms permit, allowing other senders to use freed capacity slot. | ||
/// This function can only be called when `is_ready() == true`, | ||
/// otherwise it panics. | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn disarm(mut self: Pin<&mut Self>) { | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert!(matches!(self.as_mut().pin_project_state(), State::Ready(_))); | ||
*self.pin_project_state() = State::Empty; | ||
} | ||
|
||
/// Tries to acquire a permit. | ||
/// This function can only be called when `is_ready() == true`, | ||
/// otherwise it panics. | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn poll_ready( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), SendError<()>>> { | ||
let mut fut = match std::mem::replace(self.as_mut().pin_project_state(), State::Empty) { | ||
State::Ready(_) => panic!("poll_ready() must not be called on ready sender"), | ||
State::Acquire(f) => f, | ||
State::Empty => { | ||
// Extend lifetime here. | ||
// Future will not outlive sender, neither does permit. | ||
// `Pin::into_inner_unchecked` is OK because it does not move sender. | ||
let long_lived_sender = unsafe { | ||
std::mem::transmute::< | ||
&mut tokio::sync::mpsc::Sender<T>, | ||
&'static mut tokio::sync::mpsc::Sender<T>, | ||
>(Pin::into_inner_unchecked( | ||
self.as_mut().pin_project_inner(), | ||
)) | ||
}; | ||
Box::pin(long_lived_sender.reserve()) | ||
} | ||
}; | ||
let poll = fut.as_mut().poll(cx); | ||
match poll { | ||
Poll::Pending => { | ||
*self.pin_project_state() = State::Acquire(fut); | ||
Poll::Pending | ||
} | ||
Poll::Ready(Ok(permit)) => { | ||
*self.pin_project_state() = State::Ready(permit); | ||
Poll::Ready(Ok(())) | ||
} | ||
Poll::Ready(Err(err)) => { | ||
// leave state in `Empty` state | ||
Poll::Ready(Err(err)) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
|
||
mod mpsc; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that cancellation token tests are just ignored currently. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// TODO: loom | ||
use crate::sync::Sender; | ||
use futures::future::poll_fn; | ||
use tokio::sync::mpsc::channel; | ||
|
||
fn block_on<F: std::future::Future>(fut: F) -> F::Output { | ||
tokio::pin!(fut); | ||
let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); | ||
loop { | ||
if let std::task::Poll::Ready(r) = fut.as_mut().poll(&mut cx) { | ||
return r; | ||
} | ||
} | ||
} | ||
MikailBag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#[test] | ||
fn basic() { | ||
block_on(async move { | ||
let (tx, mut rx) = channel::<String>(1); | ||
let tx = Sender::new(tx); | ||
tokio::pin!(tx); | ||
poll_fn(|cx| tx.as_mut().poll_ready(cx)).await.unwrap(); | ||
tx.send("hello".to_string()); | ||
assert_eq!(rx.try_recv(), Ok("hello".to_string())) | ||
}); | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good, but we should have a miri test that calls
is_ready
andclone
while the state is in eitherReady
orAcquire
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it seems that there is no way to poll futures under MIRI.
Of course, I plan to write some less trivial tests but there is no way to verify them in MIRI in the near future.