New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pinned Channels #2811
Pinned Channels #2811
Changes from 16 commits
5bf1ae8
fb2603e
892b759
29a46bd
20fc4a3
f84f9d8
a6c65c0
ec96101
83331d1
e92a2fa
3fcce34
fb5b501
4a64d9b
f93c7f7
24bbea2
a8ee1c0
6309a9a
0328976
177c6cb
1e73fb4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
//! Task synchronisation primitives for pinned tasks. | ||
//! | ||
//! This module provides task synchronisation for `!Send` futures. | ||
|
||
pub mod mpsc; | ||
pub mod oneshot; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,347 @@ | ||
//! A multi-producer single-receiver channel. | ||
|
||
use std::cell::UnsafeCell; | ||
use std::collections::VecDeque; | ||
use std::marker::PhantomData; | ||
use std::rc::Rc; | ||
use std::task::{Poll, Waker}; | ||
|
||
use futures::sink::Sink; | ||
use futures::stream::{FusedStream, Stream}; | ||
use thiserror::Error; | ||
|
||
/// Error returned by [`try_next`](UnboundedReceiver::try_next). | ||
#[derive(Error, Debug)] | ||
#[error("queue is empty")] | ||
pub struct TryRecvError { | ||
_marker: PhantomData<()>, | ||
} | ||
|
||
/// Error returned by [`send_now`](UnboundedSender::send_now). | ||
#[derive(Error, Debug)] | ||
#[error("failed to send")] | ||
pub struct SendError<T> { | ||
/// The send value. | ||
pub inner: T, | ||
} | ||
|
||
/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). | ||
#[derive(Error, Debug)] | ||
#[error("failed to send")] | ||
pub struct TrySendError { | ||
_marker: PhantomData<()>, | ||
} | ||
|
||
#[derive(Debug)] | ||
struct Inner<T> { | ||
rx_waker: Option<Waker>, | ||
closed: bool, | ||
sender_ctr: usize, | ||
items: VecDeque<T>, | ||
} | ||
|
||
impl<T> Inner<T> { | ||
fn close(&mut self) { | ||
self.closed = true; | ||
|
||
if let Some(ref m) = self.rx_waker { | ||
m.wake_by_ref(); | ||
} | ||
} | ||
} | ||
|
||
/// The receiver of an unbounded mpsc channel. | ||
#[derive(Debug)] | ||
pub struct UnboundedReceiver<T> { | ||
inner: Rc<UnsafeCell<Inner<T>>>, | ||
} | ||
|
||
impl<T> UnboundedReceiver<T> { | ||
/// Try to read the next value from the channel. | ||
/// | ||
/// This function will return: | ||
/// - `Ok(Some(T))` if a value is ready. | ||
/// - `Ok(None)` if the channel has become closed. | ||
/// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. | ||
pub fn try_next(&self) -> std::result::Result<Option<T>, TryRecvError> { | ||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
match (inner.items.pop_front(), inner.closed) { | ||
(Some(m), _) => Ok(Some(m)), | ||
(None, false) => Ok(None), | ||
(None, true) => Err(TryRecvError { | ||
_marker: PhantomData, | ||
}), | ||
} | ||
} | ||
} | ||
|
||
impl<T> Stream for UnboundedReceiver<T> { | ||
type Item = T; | ||
|
||
fn poll_next( | ||
self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
match (inner.items.pop_front(), inner.closed) { | ||
(Some(m), _) => Poll::Ready(Some(m)), | ||
(None, false) => { | ||
inner.rx_waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
(None, true) => Poll::Ready(None), | ||
} | ||
Comment on lines
+102
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot avoid acquiring the mutable reference to inner as we have to store the waker. (it’s an equal number of lines to map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was hoping to avoid unsafely obtaining the mutable reference. The lines of code doesn't really matter here |
||
} | ||
} | ||
|
||
impl<T> FusedStream for UnboundedReceiver<T> { | ||
fn is_terminated(&self) -> bool { | ||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// reference. | ||
let inner = unsafe { &*self.inner.get() }; | ||
inner.items.is_empty() && inner.closed | ||
} | ||
} | ||
|
||
impl<T> Drop for UnboundedReceiver<T> { | ||
fn drop(&mut self) { | ||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
WorldSEnder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
inner.close(); | ||
Comment on lines
+140
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about freeing the inner? Does UnsafeCell takes care of it? I couldn't find anything about drop behavior in UnsafeCell documentation, nor does it implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. UnsafeCell is a plain struct with an inner of type T. The drop behaviour is handled normally as any other type. |
||
} | ||
} | ||
|
||
/// The sender of an unbounded mpsc channel. | ||
#[derive(Debug)] | ||
pub struct UnboundedSender<T> { | ||
inner: Rc<UnsafeCell<Inner<T>>>, | ||
} | ||
|
||
impl<T> UnboundedSender<T> { | ||
/// Sends a value to the unbounded receiver. | ||
pub fn send_now(&self, item: T) -> Result<(), SendError<T>> { | ||
WorldSEnder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// SAFETY: This function is not used by any function that have already acquired a mutable | ||
// reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
if inner.closed { | ||
return Err(SendError { inner: item }); | ||
} | ||
|
||
inner.items.push_back(item); | ||
|
||
if let Some(ref m) = inner.rx_waker { | ||
m.wake_by_ref(); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Closes the channel. | ||
futursolo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn close_now(&self) { | ||
WorldSEnder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// SAFETY: This function is not used by any other functions that have acquired a mutable | ||
// reference and hence uniquely owns the mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
inner.close(); | ||
} | ||
} | ||
|
||
impl<T> Clone for UnboundedSender<T> { | ||
fn clone(&self) -> Self { | ||
let self_ = Self { | ||
inner: self.inner.clone(), | ||
}; | ||
futursolo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
inner.sender_ctr += 1; | ||
|
||
self_ | ||
} | ||
} | ||
|
||
impl<T> Drop for UnboundedSender<T> { | ||
fn drop(&mut self) { | ||
// SAFETY: This function is not used by any other functions and hence uniquely owns the | ||
// mutable reference. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
WorldSEnder marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let sender_ctr = { | ||
inner.sender_ctr -= 1; | ||
inner.sender_ctr | ||
}; | ||
|
||
if sender_ctr == 0 { | ||
inner.close(); | ||
} | ||
} | ||
} | ||
|
||
impl<T> Sink<T> for &'_ UnboundedSender<T> { | ||
type Error = TrySendError; | ||
|
||
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
self.send_now(item).map_err(|_| TrySendError { | ||
_marker: PhantomData, | ||
}) | ||
} | ||
|
||
fn poll_ready( | ||
self: std::pin::Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
let inner = unsafe { &*self.inner.get() }; | ||
match inner.closed { | ||
false => Poll::Ready(Ok(())), | ||
true => Poll::Ready(Err(TrySendError { | ||
_marker: PhantomData, | ||
})), | ||
} | ||
} | ||
|
||
fn poll_flush( | ||
self: std::pin::Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
fn poll_close( | ||
self: std::pin::Pin<&mut Self>, | ||
_cx: &mut std::task::Context<'_>, | ||
) -> Poll<Result<(), Self::Error>> { | ||
self.close_now(); | ||
|
||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
|
||
/// Creates an unbounded channel. | ||
/// | ||
/// # Note | ||
/// | ||
/// This channel has an infinite buffer and can run out of memory if the channel is not actively | ||
/// drained. | ||
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { | ||
let inner = Rc::new(UnsafeCell::new(Inner { | ||
rx_waker: None, | ||
closed: false, | ||
|
||
sender_ctr: 1, | ||
items: VecDeque::new(), | ||
})); | ||
|
||
( | ||
UnboundedSender { | ||
inner: inner.clone(), | ||
}, | ||
UnboundedReceiver { inner }, | ||
) | ||
} | ||
|
||
#[cfg(not(target_arch = "wasm32"))] | ||
#[cfg(feature = "tokio")] | ||
Comment on lines
+300
to
+301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we run these tests on both targets? |
||
#[cfg(test)] | ||
mod tests { | ||
use std::time::Duration; | ||
|
||
use futures::sink::SinkExt; | ||
use futures::stream::StreamExt; | ||
use tokio::task::LocalSet; | ||
use tokio::test; | ||
|
||
use super::*; | ||
use crate::platform::spawn_local; | ||
use crate::platform::time::sleep; | ||
|
||
#[test] | ||
async fn mpsc_works() { | ||
let local_set = LocalSet::new(); | ||
|
||
local_set | ||
.run_until(async { | ||
let (tx, mut rx) = unbounded::<usize>(); | ||
|
||
spawn_local(async move { | ||
for i in 0..10 { | ||
(&tx).send(i).await.expect("failed to send."); | ||
sleep(Duration::from_millis(1)).await; | ||
} | ||
}); | ||
|
||
for i in 0..10 { | ||
let received = rx.next().await.expect("failed to receive"); | ||
|
||
assert_eq!(i, received); | ||
} | ||
|
||
assert_eq!(rx.next().await, None); | ||
}) | ||
.await; | ||
} | ||
|
||
#[test] | ||
async fn mpsc_drops_receiver() { | ||
let (tx, rx) = unbounded::<usize>(); | ||
drop(rx); | ||
|
||
(&tx).send(0).await.expect_err("should fail to send."); | ||
} | ||
|
||
#[test] | ||
async fn mpsc_multi_sender() { | ||
let local_set = LocalSet::new(); | ||
|
||
local_set | ||
.run_until(async { | ||
let (tx, mut rx) = unbounded::<usize>(); | ||
|
||
spawn_local(async move { | ||
let tx2 = tx.clone(); | ||
|
||
for i in 0..10 { | ||
if i % 2 == 0 { | ||
(&tx).send(i).await.expect("failed to send."); | ||
} else { | ||
(&tx2).send(i).await.expect("failed to send."); | ||
} | ||
|
||
sleep(Duration::from_millis(1)).await; | ||
} | ||
|
||
drop(tx2); | ||
|
||
for i in 10..20 { | ||
(&tx).send(i).await.expect("failed to send."); | ||
|
||
sleep(Duration::from_millis(1)).await; | ||
} | ||
}); | ||
|
||
for i in 0..20 { | ||
let received = rx.next().await.expect("failed to receive"); | ||
|
||
assert_eq!(i, received); | ||
} | ||
|
||
assert_eq!(rx.next().await, None); | ||
}) | ||
.await; | ||
} | ||
|
||
#[test] | ||
async fn mpsc_drops_sender() { | ||
let (tx, mut rx) = unbounded::<usize>(); | ||
drop(tx); | ||
|
||
assert_eq!(rx.next().await, None); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have these markers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to mark that there is a hidden information in this struct.
Some other struct have this as we need to expand and add error reason in the future. This is here to keep consistency.