Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinned Channels #2811

Merged
merged 20 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/yew/src/html/component/lifecycle.rs
Expand Up @@ -40,7 +40,7 @@ pub(crate) enum ComponentRenderState {
},
#[cfg(feature = "ssr")]
Ssr {
sender: Option<crate::platform::sync::oneshot::Sender<Html>>,
sender: Option<crate::platform::pinned::oneshot::Sender<Html>>,
},
}

Expand Down
2 changes: 1 addition & 1 deletion packages/yew/src/html/component/scope.rs
Expand Up @@ -265,7 +265,7 @@ mod feat_ssr {
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::io::BufWriter;
use crate::platform::sync::oneshot;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;

Expand Down
1 change: 1 addition & 0 deletions packages/yew/src/platform/mod.rs
Expand Up @@ -45,6 +45,7 @@ use std::future::Future;
#[cfg(feature = "ssr")]
pub(crate) mod io;

pub mod pinned;
pub mod sync;
pub mod time;

Expand Down
6 changes: 6 additions & 0 deletions packages/yew/src/platform/pinned/mod.rs
@@ -0,0 +1,6 @@
//! Task synchronisation primitives for pinned tasks.
//!
//! This module provides task synchronisation for `!Send` futures.

pub mod mpsc;
pub mod oneshot;
347 changes: 347 additions & 0 deletions packages/yew/src/platform/pinned/mpsc.rs
@@ -0,0 +1,347 @@
//! A multi-producer single-receiver channel.

use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};

use futures::sink::Sink;
use futures::stream::{FusedStream, Stream};
use thiserror::Error;

/// Error returned by [`try_next`](UnboundedReceiver::try_next).
#[derive(Error, Debug)]
#[error("queue is empty")]
pub struct TryRecvError {
_marker: PhantomData<()>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have these markers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to mark that there is a hidden information in this struct.

Some other struct have this as we need to expand and add error reason in the future. This is here to keep consistency.

}

/// Error returned by [`send_now`](UnboundedSender::send_now).
#[derive(Error, Debug)]
#[error("failed to send")]
pub struct SendError<T> {
/// The send value.
pub inner: T,
}

/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink).
#[derive(Error, Debug)]
#[error("failed to send")]
pub struct TrySendError {
_marker: PhantomData<()>,
}

#[derive(Debug)]
struct Inner<T> {
rx_waker: Option<Waker>,
closed: bool,
sender_ctr: usize,
items: VecDeque<T>,
}

impl<T> Inner<T> {
/// Creates a unchecked mutable reference from an immutable reference.
///
/// SAFETY: You can only use this when:
///
/// 1. The mutable reference is released at the end of a function call.
/// 2. No parent function has acquired the mutable reference.
/// 3. The caller is not an async function / the mutable reference is released before an await
/// statement.
#[inline]
unsafe fn get_mut_unchecked(&self) -> *mut Self {
self as *const Self as *mut Self
}

fn close(&mut self) {
self.closed = true;

if let Some(m) = self.rx_waker.take() {
m.wake();
}
}
}

/// The receiver of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Rc<Inner<T>>,
}

impl<T> UnboundedReceiver<T> {
/// Try to read the next value from the channel.
///
/// This function will return:
/// - `Ok(Some(T))` if a value is ready.
/// - `Ok(None)` if the channel has become closed.
/// - `Err(TryRecvError)` if the channel is not closed and the channel is empty.
pub fn try_next(&self) -> std::result::Result<Option<T>, TryRecvError> {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Ok(Some(m)),
(None, false) => Ok(None),
(None, true) => Err(TryRecvError {
_marker: PhantomData,
}),
}
}
}

impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Poll::Ready(Some(m)),
(None, false) => {
inner.rx_waker = Some(cx.waker().clone());
Poll::Pending
}
(None, true) => Poll::Ready(None),
}
}
}

impl<T> FusedStream for UnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

inner.items.is_empty() && inner.closed
}
}

impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

inner.close();
}
}

/// The sender of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedSender<T> {
inner: Rc<Inner<T>>,
}

impl<T> UnboundedSender<T> {
/// Sends a value to the unbounded receiver.
pub fn send_now(&self, item: T) -> Result<(), SendError<T>> {
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

if inner.closed {
return Err(SendError { inner: item });
}

inner.items.push_back(item);

if let Some(m) = inner.rx_waker.take() {
m.wake();
}

Ok(())
}

/// Closes the channel.
futursolo marked this conversation as resolved.
Show resolved Hide resolved
pub fn close_now(&self) {
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };

inner.close();
}
}

impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
{
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
inner.sender_ctr += 1;
}

Self {
inner: self.inner.clone(),
}
futursolo marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
let sender_ctr = {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved
inner.sender_ctr -= 1;

inner.sender_ctr
};

if sender_ctr == 0 {
self.close_now();
}
}
}

impl<T> Sink<T> for &'_ UnboundedSender<T> {
type Error = TrySendError;

fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.send_now(item).map_err(|_| TrySendError {
_marker: PhantomData,
})
}

fn poll_ready(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
futursolo marked this conversation as resolved.
Show resolved Hide resolved

match inner.closed {
false => Poll::Ready(Ok(())),
true => Poll::Ready(Err(TrySendError {
_marker: PhantomData,
})),
}
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.close_now();

Poll::Ready(Ok(()))
}
}

/// Creates an unbounded channel.
///
/// # Note
///
/// This channel has an infinite buffer and can run out of memory if the channel is not actively
/// drained.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let inner = Rc::new(Inner {
rx_waker: None,
closed: false,

sender_ctr: 1,
items: VecDeque::new(),
});

(
UnboundedSender {
inner: inner.clone(),
},
UnboundedReceiver { inner },
)
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "tokio")]
Comment on lines +300 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run these tests on both targets?

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures::sink::SinkExt;
use futures::stream::StreamExt;
use tokio::task::LocalSet;
use tokio::test;

use super::*;
use crate::platform::spawn_local;
use crate::platform::time::sleep;

#[test]
async fn mpsc_works() {
let local_set = LocalSet::new();

local_set
.run_until(async {
let (tx, mut rx) = unbounded::<usize>();

spawn_local(async move {
for i in 0..10 {
(&tx).send(i).await.expect("failed to send.");
sleep(Duration::from_millis(1)).await;
}
});

for i in 0..10 {
let received = rx.next().await.expect("failed to receive");

assert_eq!(i, received);
}

assert_eq!(rx.next().await, None);
})
.await;
}

#[test]
async fn mpsc_drops_receiver() {
let (tx, rx) = unbounded::<usize>();
drop(rx);

(&tx).send(0).await.expect_err("should fail to send.");
}

#[test]
async fn mpsc_multi_sender() {
let local_set = LocalSet::new();

local_set
.run_until(async {
let (tx, mut rx) = unbounded::<usize>();

spawn_local(async move {
let tx2 = tx.clone();

for i in 0..10 {
if i % 2 == 0 {
(&tx).send(i).await.expect("failed to send.");
} else {
(&tx2).send(i).await.expect("failed to send.");
}

sleep(Duration::from_millis(1)).await;
}

drop(tx2);

for i in 10..20 {
(&tx).send(i).await.expect("failed to send.");

sleep(Duration::from_millis(1)).await;
}
});

for i in 0..20 {
let received = rx.next().await.expect("failed to receive");

assert_eq!(i, received);
}

assert_eq!(rx.next().await, None);
})
.await;
}

#[test]
async fn mpsc_drops_sender() {
let (tx, mut rx) = unbounded::<usize>();
drop(tx);

assert_eq!(rx.next().await, None);
}
}