diff --git a/.github/workflows/main-checks.yml b/.github/workflows/main-checks.yml index 02c339e4b0d..66fc8b43d71 100644 --- a/.github/workflows/main-checks.yml +++ b/.github/workflows/main-checks.yml @@ -184,7 +184,7 @@ jobs: env: # workaround for lack of ternary operator # see https://github.com/orgs/community/discussions/25725 - RUSTFLAGS: ${{ matrix.toolchain == 'nightly && '--cfg nightly_yew' || '' }} + RUSTFLAGS: ${{ matrix.toolchain == 'nightly' && '--cfg nightly_yew' || '' }} with: command: test args: --all-targets --workspace --exclude yew @@ -194,7 +194,7 @@ jobs: env: # workaround for lack of ternary operator # see https://github.com/orgs/community/discussions/25725 - RUSTFLAGS: ${{ matrix.toolchain == 'nightly && '--cfg nightly_yew' || '' }} + RUSTFLAGS: ${{ matrix.toolchain == 'nightly' && '--cfg nightly_yew' || '' }} with: command: test args: -p yew --all-features diff --git a/packages/yew/src/html/component/lifecycle.rs b/packages/yew/src/html/component/lifecycle.rs index 6e687cdeda3..40fb28763ee 100644 --- a/packages/yew/src/html/component/lifecycle.rs +++ b/packages/yew/src/html/component/lifecycle.rs @@ -40,7 +40,7 @@ pub(crate) enum ComponentRenderState { }, #[cfg(feature = "ssr")] Ssr { - sender: Option>, + sender: Option>, }, } diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index e513701ed3e..42fb6e367c9 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -265,7 +265,7 @@ mod feat_ssr { ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; use crate::platform::io::BufWriter; - use crate::platform::sync::oneshot; + use crate::platform::pinned::oneshot; use crate::scheduler; use crate::virtual_dom::Collectable; diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index bbab2211b03..2fea187d2ee 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -45,6 +45,7 @@ use std::future::Future; #[cfg(feature = "ssr")] pub(crate) mod io; +pub mod pinned; pub mod sync; pub mod time; diff --git a/packages/yew/src/platform/pinned/mod.rs b/packages/yew/src/platform/pinned/mod.rs new file mode 100644 index 00000000000..ee0eb9df21b --- /dev/null +++ b/packages/yew/src/platform/pinned/mod.rs @@ -0,0 +1,6 @@ +//! Task synchronisation primitives for pinned tasks. +//! +//! This module provides task synchronisation for `!Send` futures. + +pub mod mpsc; +pub mod oneshot; diff --git a/packages/yew/src/platform/pinned/mpsc.rs b/packages/yew/src/platform/pinned/mpsc.rs new file mode 100644 index 00000000000..0aa6b95823c --- /dev/null +++ b/packages/yew/src/platform/pinned/mpsc.rs @@ -0,0 +1,397 @@ +//! A multi-producer single-receiver channel. + +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use futures::sink::Sink; +use futures::stream::{FusedStream, Stream}; +use thiserror::Error; + +/// Error returned by [`try_next`](UnboundedReceiver::try_next). +#[derive(Error, Debug)] +#[error("queue is empty")] +pub struct TryRecvError { + _marker: PhantomData<()>, +} + +/// Error returned by [`send_now`](UnboundedSender::send_now). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct SendError { + /// The send value. + pub inner: T, +} + +/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct TrySendError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + sender_ctr: usize, + items: VecDeque, +} + +impl Inner { + fn close(&mut self) { + self.closed = true; + + if let Some(ref m) = self.rx_waker { + m.wake_by_ref(); + } + } +} + +/// The receiver of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedReceiver { + inner: Rc>>, +} + +impl UnboundedReceiver { + /// Try to read the next value from the channel. + /// + /// This function will return: + /// - `Ok(Some(T))` if a value is ready. + /// - `Ok(None)` if the channel has become closed. + /// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. + pub fn try_next(&self) -> std::result::Result, TryRecvError> { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Ok(Some(m)), + (None, false) => Ok(None), + (None, true) => Err(TryRecvError { + _marker: PhantomData, + }), + } + } +} + +impl Stream for UnboundedReceiver { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Poll::Ready(Some(m)), + (None, false) => { + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } + (None, true) => Poll::Ready(None), + } + } +} + +impl FusedStream for UnboundedReceiver { + fn is_terminated(&self) -> bool { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &*self.inner.get() }; + inner.items.is_empty() && inner.closed + } +} + +impl Drop for UnboundedReceiver { + fn drop(&mut self) { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + inner.close(); + } +} + +/// The sender of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedSender { + inner: Rc>>, +} + +impl UnboundedSender { + /// Sends a value to the unbounded receiver. + pub fn send_now(&self, item: T) -> Result<(), SendError> { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any function that have already acquired a mutable + // reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + if inner.closed { + return Err(SendError { inner: item }); + } + + inner.items.push_back(item); + + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + + Ok(()) + } + + /// Closes the channel. + /// + /// Every sender (dropped or not) is considered closed when this method is called. + pub fn close_now(&self) { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any function that have already acquired a mutable + // reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + inner.close(); + } +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + let self_ = Self { + inner: self.inner.clone(), + }; + + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + inner.sender_ctr += 1; + + self_ + } +} + +impl Drop for UnboundedSender { + fn drop(&mut self) { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + let sender_ctr = { + inner.sender_ctr -= 1; + inner.sender_ctr + }; + + if sender_ctr == 0 { + inner.close(); + } + } +} + +impl Sink for &'_ UnboundedSender { + type Error = TrySendError; + + fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.send_now(item).map_err(|_| TrySendError { + _marker: PhantomData, + }) + } + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + let inner = unsafe { &*self.inner.get() }; + match inner.closed { + false => Poll::Ready(Ok(())), + true => Poll::Ready(Err(TrySendError { + _marker: PhantomData, + })), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.close_now(); + + Poll::Ready(Ok(())) + } +} + +/// Creates an unbounded channel. +/// +/// # Note +/// +/// This channel has an infinite buffer and can run out of memory if the channel is not actively +/// drained. +pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + let inner = Rc::new(UnsafeCell::new(Inner { + rx_waker: None, + closed: false, + + sender_ctr: 1, + items: VecDeque::new(), + })); + + ( + UnboundedSender { + inner: inner.clone(), + }, + UnboundedReceiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::sink::SinkExt; + use futures::stream::StreamExt; + use tokio::task::LocalSet; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn mpsc_works() { + let local_set = LocalSet::new(); + + local_set + .run_until(async { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + for i in 0..10 { + (&tx).send(i).await.expect("failed to send."); + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..10 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + }) + .await; + } + + #[test] + async fn mpsc_drops_receiver() { + let (tx, rx) = unbounded::(); + drop(rx); + + (&tx).send(0).await.expect_err("should fail to send."); + } + + #[test] + async fn mpsc_multi_sender() { + let local_set = LocalSet::new(); + + local_set + .run_until(async { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + let tx2 = tx.clone(); + + for i in 0..10 { + if i % 2 == 0 { + (&tx).send(i).await.expect("failed to send."); + } else { + (&tx2).send(i).await.expect("failed to send."); + } + + sleep(Duration::from_millis(1)).await; + } + + drop(tx2); + + for i in 10..20 { + (&tx).send(i).await.expect("failed to send."); + + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..20 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + }) + .await; + } + + #[test] + async fn mpsc_drops_sender() { + let (tx, mut rx) = unbounded::(); + drop(tx); + + assert_eq!(rx.next().await, None); + } +} diff --git a/packages/yew/src/platform/pinned/oneshot.rs b/packages/yew/src/platform/pinned/oneshot.rs new file mode 100644 index 00000000000..db35fc4d115 --- /dev/null +++ b/packages/yew/src/platform/pinned/oneshot.rs @@ -0,0 +1,220 @@ +//! A one-time send - receive channel. + +use std::cell::UnsafeCell; +use std::future::Future; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use thiserror::Error; + +/// Error returned by awaiting the [`Receiver`]. +#[derive(Debug, Error)] +#[error("channel has been closed.")] +pub struct RecvError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + item: Option, +} + +/// The receiver of a oneshot channel. +#[derive(Debug)] +pub struct Receiver { + inner: Rc>>, +} + +impl Future for Receiver { + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + // Implementation Note: + // + // It might be neater to use a match pattern here. + // However, this will slow down the polling process by 10%. + + if let Some(m) = inner.item.take() { + return Poll::Ready(Ok(m)); + } + + if inner.closed { + return Poll::Ready(Err(RecvError { + _marker: PhantomData, + })); + } + + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + inner.closed = true; + } +} + +/// The sender of a oneshot channel. +#[derive(Debug)] +pub struct Sender { + inner: Rc>>, +} + +impl Sender { + /// Send an item to the other side of the channel, consumes the sender. + pub fn send(self, item: T) -> Result<(), T> { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + if inner.closed { + return Err(item); + } + + inner.item = Some(item); + + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + + Ok(()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + // SAFETY: + // + // We can acquire a mutable reference without checking as: + // + // - This type is !Sync and !Send. + // - This function is not used by any other functions and hence uniquely owns the + // mutable reference. + // - The mutable reference is dropped at the end of this function. + let inner = unsafe { &mut *self.inner.get() }; + + inner.closed = true; + + if inner.item.is_none() { + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + } + } +} + +/// Creates a oneshot channel. +pub fn channel() -> (Sender, Receiver) { + let inner = Rc::new(UnsafeCell::new(Inner { + rx_waker: None, + closed: false, + item: None, + })); + + ( + Sender { + inner: inner.clone(), + }, + Receiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tokio::sync::Barrier; + use tokio::task::LocalSet; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn oneshot_works() { + let (tx, rx) = channel(); + + tx.send(0).expect("failed to send."); + + assert_eq!(rx.await.expect("failed to receive."), 0); + } + + #[test] + async fn oneshot_drops_sender() { + let local_set = LocalSet::new(); + + local_set + .run_until(async { + let (tx, rx) = channel::(); + + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(tx); + }); + rx.await.expect_err("successful to receive."); + }) + .await; + } + + #[test] + async fn oneshot_drops_receiver() { + let local_set = LocalSet::new(); + + local_set + .run_until(async { + let (tx, rx) = channel::(); + + let bar = Arc::new(Barrier::new(2)); + + { + let bar = bar.clone(); + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(rx); + + bar.wait().await; + }); + } + + bar.wait().await; + + tx.send(0).expect_err("successful to send."); + }) + .await; + } +}