From 5bf1ae83362335a0f25bd449f3c1bf63d412a371 Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Thu, 4 Aug 2022 21:52:32 +0900 Subject: [PATCH 01/25] Switch to pinned channels. --- packages/yew/Cargo.toml | 8 +- packages/yew/src/html/component/lifecycle.rs | 2 +- packages/yew/src/html/component/scope.rs | 6 +- packages/yew/src/platform/{io.rs => fmt.rs} | 94 ++++-- packages/yew/src/platform/mod.rs | 4 +- packages/yew/src/platform/pinned/mod.rs | 6 + packages/yew/src/platform/pinned/mpsc.rs | 334 +++++++++++++++++++ packages/yew/src/platform/pinned/oneshot.rs | 198 +++++++++++ packages/yew/src/platform/sync/mod.rs | 5 - packages/yew/src/platform/sync/mpsc.rs | 6 - packages/yew/src/server_renderer.rs | 37 +- packages/yew/src/virtual_dom/mod.rs | 6 +- packages/yew/src/virtual_dom/vcomp.rs | 8 +- packages/yew/src/virtual_dom/vlist.rs | 10 +- packages/yew/src/virtual_dom/vnode.rs | 6 +- packages/yew/src/virtual_dom/vsuspense.rs | 4 +- packages/yew/src/virtual_dom/vtag.rs | 6 +- packages/yew/src/virtual_dom/vtext.rs | 4 +- tools/Cargo.lock | 1 - 19 files changed, 660 insertions(+), 85 deletions(-) rename packages/yew/src/platform/{io.rs => fmt.rs} (68%) create mode 100644 packages/yew/src/platform/pinned/mod.rs create mode 100644 packages/yew/src/platform/pinned/mpsc.rs create mode 100644 packages/yew/src/platform/pinned/oneshot.rs delete mode 100644 packages/yew/src/platform/sync/mod.rs delete mode 100644 packages/yew/src/platform/sync/mpsc.rs diff --git a/packages/yew/Cargo.toml b/packages/yew/Cargo.toml index 6742ddbbac0..4a68f11dc46 100644 --- a/packages/yew/Cargo.toml +++ b/packages/yew/Cargo.toml @@ -31,8 +31,6 @@ implicit-clone = { version = "0.3", features = ["map"] } base64ct = { version = "1.5.0", features = ["std"], optional = true } bincode = { version = "1.3.3", optional = true } serde = { version = "1", features = ["derive"] } -tokio = { version = "1.19", features = ["sync"] } -tokio-stream = { version = "0.1.9", features = ["sync"] } [dependencies.web-sys] version = "0.3" @@ -75,8 +73,10 @@ wasm-bindgen-futures = "0.4" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] num_cpus = { version = "1.13", optional = true } -tokio-util = { version = "0.7", features = ["rt"], optional = true } once_cell = "1" +tokio = { version = "1.19", features = ["rt", "time"], optional = true } +tokio-stream = { version = "0.1", features = ["time"], optional = true } +tokio-util = { version = "0.7", features = ["rt"], optional = true } [dev-dependencies] wasm-bindgen-test = "0.3" @@ -93,7 +93,7 @@ features = [ ] [features] -tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"] +tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"] ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"] csr = [] hydration = ["csr", "dep:bincode"] diff --git a/packages/yew/src/html/component/lifecycle.rs b/packages/yew/src/html/component/lifecycle.rs index 89fe2022a04..9007d0061eb 100644 --- a/packages/yew/src/html/component/lifecycle.rs +++ b/packages/yew/src/html/component/lifecycle.rs @@ -41,7 +41,7 @@ pub(crate) enum ComponentRenderState { #[cfg(feature = "ssr")] Ssr { - sender: Option>, + sender: Option>, }, } diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index bf821025cfc..4c627725cb9 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -264,15 +264,15 @@ mod feat_ssr { use crate::html::component::lifecycle::{ ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; - use crate::platform::io::BufWriter; - use crate::platform::sync::oneshot; + use crate::platform::fmt::BufWrite; + use crate::platform::pinned::oneshot; use crate::scheduler; use crate::virtual_dom::Collectable; impl Scope { pub(crate) async fn render_into_stream( &self, - w: &mut BufWriter, + w: &mut dyn BufWrite, props: Rc, hydratable: bool, ) { diff --git a/packages/yew/src/platform/io.rs b/packages/yew/src/platform/fmt.rs similarity index 68% rename from packages/yew/src/platform/io.rs rename to packages/yew/src/platform/fmt.rs index 14e0e11d56e..72d9775655c 100644 --- a/packages/yew/src/platform/io.rs +++ b/packages/yew/src/platform/fmt.rs @@ -5,31 +5,40 @@ use std::borrow::Cow; -use futures::stream::Stream; - -use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender}; +use crate::platform::pinned; // Same as std::io::BufWriter and futures::io::BufWriter. pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024; -/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream. -pub(crate) struct BufWriter { - buf: String, - tx: UnboundedSender, - capacity: usize, +pub(crate) trait BufSend { + fn buf_send(&self, item: String); } -/// Creates a Buffer pair. -pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream) { - let (tx, rx) = mpsc::unbounded_channel::(); +impl BufSend for pinned::mpsc::UnboundedSender { + fn buf_send(&self, item: String) { + let _ = self.send_now(item); + } +} - let tx = BufWriter { - buf: String::with_capacity(capacity), - tx, - capacity, - }; +impl BufSend for futures::channel::mpsc::UnboundedSender { + fn buf_send(&self, item: String) { + let _ = self.unbounded_send(item); + } +} - (tx, UnboundedReceiverStream::new(rx)) +pub trait BufWrite { + fn capacity(&self) -> usize; + fn write(&mut self, s: Cow<'_, str>); +} + +/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream. +pub(crate) struct BufWriter +where + S: BufSend, +{ + buf: String, + tx: S, + capacity: usize, } // Implementation Notes: @@ -53,15 +62,30 @@ pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream) // 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled. // Using a stream avoids this side effect and allows the renderer to finish rendering // without being actively polled. -impl BufWriter { - #[inline] - pub fn capacity(&self) -> usize { - self.capacity +impl BufWriter +where + S: BufSend, +{ + pub fn new(tx: S, capacity: usize) -> Self { + Self { + buf: String::new(), + tx, + capacity, + } } + #[inline] fn drain(&mut self) { - let _ = self.tx.send(self.buf.drain(..).collect()); - self.buf.reserve(self.capacity); + if !self.buf.is_empty() { + self.tx.buf_send(self.buf.split_off(0)); + } + } + + #[inline] + fn reserve(&mut self) { + if self.buf.is_empty() { + self.buf.reserve(self.capacity); + } } /// Returns `True` if the internal buffer has capacity to fit a string of certain length. @@ -69,9 +93,22 @@ impl BufWriter { fn has_capacity_of(&self, next_part_len: usize) -> bool { self.buf.capacity() >= self.buf.len() + next_part_len } +} + +impl BufWrite for BufWriter +where + S: BufSend, +{ + #[inline] + fn capacity(&self) -> usize { + self.capacity + } /// Writes a string into the buffer, optionally drains the buffer. - pub fn write(&mut self, s: Cow<'_, str>) { + fn write(&mut self, s: Cow<'_, str>) { + // Try to reserve the capacity first. + self.reserve(); + if !self.has_capacity_of(s.len()) { // There isn't enough capacity, we drain the buffer. self.drain(); @@ -87,17 +124,20 @@ impl BufWriter { // changes if the buffer was drained. If the buffer capacity didn't change, // then it means self.has_capacity_of() has returned true the first time which will be // guaranteed to be matched by the left hand side of this implementation. - let _ = self.tx.send(s.into_owned()); + self.tx.buf_send(s.into_owned()); } } } -impl Drop for BufWriter { +impl Drop for BufWriter +where + S: BufSend, +{ fn drop(&mut self) { if !self.buf.is_empty() { let mut buf = String::new(); std::mem::swap(&mut buf, &mut self.buf); - let _ = self.tx.send(buf); + self.tx.buf_send(buf); } } } diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index bbab2211b03..8d0d44b3151 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -43,9 +43,9 @@ use std::future::Future; #[cfg(feature = "ssr")] -pub(crate) mod io; +pub(crate) mod fmt; -pub mod sync; +pub mod pinned; pub mod time; #[cfg(target_arch = "wasm32")] diff --git a/packages/yew/src/platform/pinned/mod.rs b/packages/yew/src/platform/pinned/mod.rs new file mode 100644 index 00000000000..21e1316e084 --- /dev/null +++ b/packages/yew/src/platform/pinned/mod.rs @@ -0,0 +1,6 @@ +//! Task Synchronisation Primitives for pinned tasks. +//! +//! This module provides task synchronisation for `!Send` futures. + +pub mod mpsc; +pub mod oneshot; diff --git a/packages/yew/src/platform/pinned/mpsc.rs b/packages/yew/src/platform/pinned/mpsc.rs new file mode 100644 index 00000000000..a4af40abed9 --- /dev/null +++ b/packages/yew/src/platform/pinned/mpsc.rs @@ -0,0 +1,334 @@ +//! A multi-producer single-receiver channel. + +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use futures::sink::Sink; +use futures::stream::{FusedStream, Stream}; +use thiserror::Error; + +/// Error returned by [`try_next`](UnboundedReceiver::try_next). +#[derive(Error, Debug)] +#[error("queue is empty")] +pub struct TryRecvError { + _marker: PhantomData<()>, +} + +/// Error returned by [`send_now`](UnboundedSender::send_now). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct SendError { + /// The send value. + pub inner: T, +} + +/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct TrySendError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + sender_ctr: usize, + items: VecDeque, +} + +impl Inner { + /// Creates a unchecked mutable reference from an immutable reference. + /// + /// SAFETY: You can only use this when: + /// + /// 1. The mutable reference is released at the end of a function call. + /// 2. No parent function has acquired the mutable reference. + /// 3. The caller is not an async function / the mutable reference is released before an await + /// statement. + #[inline] + unsafe fn get_mut_unchecked(&self) -> *mut Self { + self as *const Self as *mut Self + } + + fn close(&mut self) { + self.closed = true; + + if let Some(m) = self.rx_waker.take() { + m.wake(); + } + } +} + +/// The receiver of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedReceiver { + inner: Rc>, +} + +impl UnboundedReceiver { + /// Try to read the next value from the channel. + /// + /// This function will return: + /// - `Ok(Some(T))` if a value is ready. + /// - `Ok(None)` if the channel has become closed. + /// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. + pub fn try_next(&self) -> std::result::Result, TryRecvError> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Ok(Some(m)), + (None, false) => Ok(None), + (None, true) => Err(TryRecvError { + _marker: PhantomData, + }), + } + } +} + +impl Stream for UnboundedReceiver { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Poll::Ready(Some(m)), + (None, false) => { + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } + (None, true) => Poll::Ready(None), + } + } +} + +impl FusedStream for UnboundedReceiver { + fn is_terminated(&self) -> bool { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.items.is_empty() && inner.closed + } +} + +impl Drop for UnboundedReceiver { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.close(); + } +} + +/// The sender of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedSender { + inner: Rc>, +} + +impl UnboundedSender { + /// Sends a value to the unbounded receiver. + pub fn send_now(&self, item: T) -> Result<(), SendError> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + if inner.closed { + return Err(SendError { inner: item }); + } + + inner.items.push_back(item); + + if let Some(m) = inner.rx_waker.take() { + m.wake(); + } + + Ok(()) + } + + /// Closes the channel. + pub fn close_now(&self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.close(); + } +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + inner.sender_ctr += 1; + } + + Self { + inner: self.inner.clone(), + } + } +} + +impl Drop for UnboundedSender { + fn drop(&mut self) { + let sender_ctr = { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + inner.sender_ctr -= 1; + + inner.sender_ctr + }; + + if sender_ctr == 0 { + self.close_now(); + } + } +} + +impl Sink for &'_ UnboundedSender { + type Error = TrySendError; + + fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.send_now(item).map_err(|_| TrySendError { + _marker: PhantomData, + }) + } + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match inner.closed { + false => Poll::Ready(Ok(())), + true => Poll::Ready(Err(TrySendError { + _marker: PhantomData, + })), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.close_now(); + + Poll::Ready(Ok(())) + } +} + +/// Creates an unbounded channel. +/// +/// # Note +/// +/// This channel has an infinite buffer and can run out of memory if the channel is not actively +/// drained. +pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + let inner = Rc::new(Inner { + rx_waker: None, + closed: false, + + sender_ctr: 1, + items: VecDeque::new(), + }); + + ( + UnboundedSender { + inner: inner.clone(), + }, + UnboundedReceiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::sink::SinkExt; + use futures::stream::StreamExt; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn mpsc_works() { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + for i in 0..10 { + (&tx).send(i).await.expect("failed to send."); + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..10 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + } + + #[test] + async fn mpsc_drops_receiver() { + let (tx, rx) = unbounded::(); + drop(rx); + + (&tx).send(0).await.expect_err("should fail to send."); + } + + #[test] + async fn mpsc_multi_sender() { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + let tx2 = tx.clone(); + + for i in 0..10 { + if i % 2 == 0 { + (&tx).send(i).await.expect("failed to send."); + } else { + (&tx2).send(i).await.expect("failed to send."); + } + + sleep(Duration::from_millis(1)).await; + } + + drop(tx2); + + for i in 10..20 { + (&tx).send(i).await.expect("failed to send."); + + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..20 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + } + + #[test] + async fn mpsc_drops_sender() { + let (tx, mut rx) = unbounded::(); + drop(tx); + + assert_eq!(rx.next().await, None); + } +} diff --git a/packages/yew/src/platform/pinned/oneshot.rs b/packages/yew/src/platform/pinned/oneshot.rs new file mode 100644 index 00000000000..545f024e1b6 --- /dev/null +++ b/packages/yew/src/platform/pinned/oneshot.rs @@ -0,0 +1,198 @@ +//! A one-time send - receive channel. + +use std::future::Future; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use thiserror::Error; + +/// Error returned by [`send`](Sender::send). +#[derive(Debug, Error)] +#[error("channel has been closed.")] +pub struct SendError { + /// The inner value. + pub inner: T, +} + +/// Error returned by awaiting the [`Receiver`]. +#[derive(Debug, Error)] +#[error("channel has been closed.")] +pub struct RecvError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + item: Option, +} + +impl Inner { + /// Creates a unchecked mutable reference from a mutable reference. + /// + /// SAFETY: You can only use this when: + /// + /// 1. The mutable reference is released at the end of a function call. + /// 2. No parent function has acquired the mutable reference. + /// 3. The caller is not an async function / the mutable reference is released before an await + /// statement. + #[inline] + unsafe fn get_mut_unchecked(&self) -> *mut Self { + self as *const Self as *mut Self + } +} + +/// The receiver of a oneshot channel. +#[derive(Debug)] +pub struct Receiver { + inner: Rc>, +} + +impl Future for Receiver { + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + // Implementation Note: + // + // It might be neater to use a match pattern here. + // However, this will slow down the polling process by 10%. + + if let Some(m) = inner.item.take() { + return Poll::Ready(Ok(m)); + } + + if inner.closed { + return Poll::Ready(Err(RecvError { + _marker: PhantomData, + })); + } + + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.closed = true; + } +} + +/// The sender of a oneshot channel. +#[derive(Debug)] +pub struct Sender { + inner: Rc>, +} + +impl Sender { + /// Send an item to the other side of the channel, consumes the sender. + pub fn send(self, item: T) -> Result<(), T> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + if inner.closed { + return Err(item); + } + + inner.item = Some(item); + + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + + Ok(()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.closed = true; + + if inner.item.is_none() { + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + } + } +} + +/// Creates a oneshot channel. +pub fn channel() -> (Sender, Receiver) { + let inner = Rc::new(Inner { + rx_waker: None, + closed: false, + item: None, + }); + + ( + Sender { + inner: inner.clone(), + }, + Receiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tokio::sync::Barrier; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn oneshot_works() { + let (tx, rx) = channel(); + + tx.send(0).expect("failed to send."); + + assert_eq!(rx.await.expect("failed to receive."), 0); + } + + #[test] + async fn oneshot_drops_sender() { + let (tx, rx) = channel::(); + + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(tx); + }); + rx.await.expect_err("successful to receive."); + } + + #[test] + async fn oneshot_drops_receiver() { + let (tx, rx) = channel::(); + + let bar = Arc::new(Barrier::new(2)); + + { + let bar = bar.clone(); + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(rx); + + bar.wait().await; + }); + } + + bar.wait().await; + + tx.send(0).expect_err("successful to send."); + } +} diff --git a/packages/yew/src/platform/sync/mod.rs b/packages/yew/src/platform/sync/mod.rs deleted file mode 100644 index 63c99dec41a..00000000000 --- a/packages/yew/src/platform/sync/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! A module that provides task synchronisation primitives. - -#[doc(inline)] -pub use tokio::sync::oneshot; -pub mod mpsc; diff --git a/packages/yew/src/platform/sync/mpsc.rs b/packages/yew/src/platform/sync/mpsc.rs deleted file mode 100644 index de09d342bc9..00000000000 --- a/packages/yew/src/platform/sync/mpsc.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! A multi-producer, single-receiver channel. - -#[doc(inline)] -pub use tokio::sync::mpsc::*; -#[doc(inline)] -pub use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index d31dd80aa7e..1bc62b1e882 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -3,7 +3,8 @@ use std::fmt; use futures::stream::{Stream, StreamExt}; use crate::html::{BaseComponent, Scope}; -use crate::platform::io::{self, DEFAULT_BUF_SIZE}; +use crate::platform::fmt::{BufWriter, DEFAULT_BUF_SIZE}; +use crate::platform::pinned::mpsc; use crate::platform::{run_pinned, spawn_local}; /// A Yew Server-side Renderer that renders on the current thread. @@ -93,7 +94,8 @@ where /// Renders Yew Applications into a string Stream pub fn render_stream(self) -> impl Stream { - let (mut w, r) = io::buffer(self.capacity); + let (tx, rx) = mpsc::unbounded(); + let mut w = BufWriter::new(tx, self.capacity); let scope = Scope::::new(None); spawn_local(async move { @@ -102,7 +104,7 @@ where .await; }); - r + rx } } @@ -218,21 +220,26 @@ where /// /// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`. pub async fn render_stream(self) -> impl Stream { - // We use run_pinned to switch to our runtime. - run_pinned(move || async move { - let Self { - create_props, - hydratable, - capacity, - } = self; + let Self { + create_props, + hydratable, + capacity, + } = self; + + let (tx, rx) = futures::channel::mpsc::unbounded(); + run_pinned(move || async move { let props = create_props(); + let scope = Scope::::new(None); + + let mut w = BufWriter::new(tx, capacity); - LocalServerRenderer::::with_props(props) - .hydratable(hydratable) - .capacity(capacity) - .render_stream() + scope + .render_into_stream(&mut w, props.into(), hydratable) + .await; }) - .await + .await; + + rx } } diff --git a/packages/yew/src/virtual_dom/mod.rs b/packages/yew/src/virtual_dom/mod.rs index e7d9797b18c..99085f73bea 100644 --- a/packages/yew/src/virtual_dom/mod.rs +++ b/packages/yew/src/virtual_dom/mod.rs @@ -112,10 +112,10 @@ pub(crate) use feat_ssr_hydration::*; #[cfg(feature = "ssr")] mod feat_ssr { use super::*; - use crate::platform::io::BufWriter; + use crate::platform::fmt::BufWrite; impl Collectable { - pub(crate) fn write_open_tag(&self, w: &mut BufWriter) { + pub(crate) fn write_open_tag(&self, w: &mut dyn BufWrite) { w.write("".into()); } - pub(crate) fn write_close_tag(&self, w: &mut BufWriter) { + pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) { w.write("".into()); + let _ = w.write_str(self.end_mark()); + let _ = w.write_str("-->"); } - pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) { - w.write("".into()); + let _ = w.write_str(self.end_mark()); + let _ = w.write_str("-->"); } } } diff --git a/packages/yew/src/virtual_dom/vcomp.rs b/packages/yew/src/virtual_dom/vcomp.rs index 774bca7c74e..407fc6a5eee 100644 --- a/packages/yew/src/virtual_dom/vcomp.rs +++ b/packages/yew/src/virtual_dom/vcomp.rs @@ -20,7 +20,7 @@ use crate::html::{AnyScope, Scope}; #[cfg(feature = "csr")] use crate::html::{NodeRef, Scoped}; #[cfg(feature = "ssr")] -use crate::platform::fmt::BufWrite; +use crate::platform::fmt::Writer; /// A virtual component. pub struct VComp { @@ -71,7 +71,7 @@ pub(crate) trait Mountable { #[cfg(feature = "ssr")] fn render_into_stream<'a>( &'a self, - w: &'a mut dyn BufWrite, + w: &'a mut Writer, parent_scope: &'a AnyScope, hydratable: bool, ) -> LocalBoxFuture<'a, ()>; @@ -129,7 +129,7 @@ impl Mountable for PropsWrapper { #[cfg(feature = "ssr")] fn render_into_stream<'a>( &'a self, - w: &'a mut dyn BufWrite, + w: &'a mut Writer, parent_scope: &'a AnyScope, hydratable: bool, ) -> LocalBoxFuture<'a, ()> { @@ -243,7 +243,7 @@ mod feat_ssr { #[inline] pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut Writer, parent_scope: &AnyScope, hydratable: bool, ) { diff --git a/packages/yew/src/virtual_dom/vlist.rs b/packages/yew/src/virtual_dom/vlist.rs index e437f38a6d6..5d4dab0d338 100644 --- a/packages/yew/src/virtual_dom/vlist.rs +++ b/packages/yew/src/virtual_dom/vlist.rs @@ -156,18 +156,19 @@ mod test { #[cfg(feature = "ssr")] mod feat_ssr { + use std::fmt::Write; + use futures::stream::{FuturesUnordered, StreamExt}; - use futures::{future, FutureExt}; + use futures::{future, pin_mut, FutureExt}; use super::*; use crate::html::AnyScope; - use crate::platform::fmt::{BufWrite, BufWriter}; - use crate::platform::pinned::mpsc; + use crate::platform::fmt::{BufStream, Writer}; impl VList { pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut Writer, parent_scope: &AnyScope, hydratable: bool, ) { @@ -177,21 +178,19 @@ mod feat_ssr { child.render_into_stream(w, parent_scope, hydratable).await; } [first_child, rest_children @ ..] => { - let buf_capacity = w.capacity(); let mut child_streams = Vec::with_capacity(self.children.len() - 1); // Concurrently render rest children into a separate buffer. let rest_child_furs = rest_children.iter().map(|child| { - let (tx, rx) = mpsc::unbounded(); - let mut w = BufWriter::new(tx, buf_capacity); - - child_streams.push(rx); - - async move { + let (s, resolver) = BufStream::new_with_resolver(move |mut w| async move { child .render_into_stream(&mut w, parent_scope, hydratable) .await; - } + }); + + child_streams.push(s); + + resolver }); // Concurrently resolve all child futures. @@ -215,9 +214,10 @@ mod feat_ssr { .await; // Transfer results to parent writer. - for mut r in child_streams { + for r in child_streams { + pin_mut!(r); while let Some(next_chunk) = r.next().await { - w.write(next_chunk.into()); + let _ = w.write_str(&*next_chunk); } } }; diff --git a/packages/yew/src/virtual_dom/vnode.rs b/packages/yew/src/virtual_dom/vnode.rs index 0d3145bb206..d76765c6e64 100644 --- a/packages/yew/src/virtual_dom/vnode.rs +++ b/packages/yew/src/virtual_dom/vnode.rs @@ -153,18 +153,18 @@ mod feat_ssr { use super::*; use crate::html::AnyScope; - use crate::platform::fmt::BufWrite; + use crate::platform::fmt::Writer; impl VNode { pub(crate) fn render_into_stream<'a>( &'a self, - w: &'a mut dyn BufWrite, + w: &'a mut Writer, parent_scope: &'a AnyScope, hydratable: bool, ) -> LocalBoxFuture<'a, ()> { async fn render_into_stream_( this: &VNode, - w: &mut dyn BufWrite, + w: &mut Writer, parent_scope: &AnyScope, hydratable: bool, ) { diff --git a/packages/yew/src/virtual_dom/vsuspense.rs b/packages/yew/src/virtual_dom/vsuspense.rs index b91bad3b219..34a84f64f11 100644 --- a/packages/yew/src/virtual_dom/vsuspense.rs +++ b/packages/yew/src/virtual_dom/vsuspense.rs @@ -28,13 +28,13 @@ impl VSuspense { mod feat_ssr { use super::*; use crate::html::AnyScope; - use crate::platform::fmt::BufWrite; + use crate::platform::fmt::Writer; use crate::virtual_dom::Collectable; impl VSuspense { pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut Writer, parent_scope: &AnyScope, hydratable: bool, ) { diff --git a/packages/yew/src/virtual_dom/vtag.rs b/packages/yew/src/virtual_dom/vtag.rs index 382b368dd52..23dcc2a281e 100644 --- a/packages/yew/src/virtual_dom/vtag.rs +++ b/packages/yew/src/virtual_dom/vtag.rs @@ -428,9 +428,11 @@ impl PartialEq for VTag { #[cfg(feature = "ssr")] mod feat_ssr { + use std::fmt::Write; + use super::*; use crate::html::AnyScope; - use crate::platform::fmt::BufWrite; + use crate::platform::fmt::Writer; use crate::virtual_dom::VText; // Elements that cannot have any child elements. @@ -442,21 +444,21 @@ mod feat_ssr { impl VTag { pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut Writer, parent_scope: &AnyScope, hydratable: bool, ) { - w.write("<".into()); - w.write(self.tag().into()); + let _ = w.write_str("<"); + let _ = w.write_str(self.tag()); - let write_attr = |w: &mut dyn BufWrite, name: &str, val: Option<&str>| { - w.write(" ".into()); - w.write(name.into()); + let write_attr = |w: &mut Writer, name: &str, val: Option<&str>| { + let _ = w.write_str(" "); + let _ = w.write_str(name); if let Some(m) = val { - w.write("=\"".into()); - w.write(html_escape::encode_double_quoted_attribute(m)); - w.write("\"".into()); + let _ = w.write_str("=\""); + let _ = w.write_str(&*html_escape::encode_double_quoted_attribute(m)); + let _ = w.write_str("\""); } }; @@ -474,7 +476,7 @@ mod feat_ssr { write_attr(w, k, Some(v)); } - w.write(">".into()); + let _ = w.write_str(">"); match self.inner { VTagInner::Input(_) => {} @@ -485,7 +487,7 @@ mod feat_ssr { .await; } - w.write("".into()); + let _ = w.write_str(""); } VTagInner::Other { ref tag, @@ -497,9 +499,9 @@ mod feat_ssr { .render_into_stream(w, parent_scope, hydratable) .await; - w.write(Cow::Borrowed("")); + let _ = w.write_str(""); } else { // We don't write children of void elements nor closing tags. debug_assert!(children.is_empty(), "{} cannot have any children!", tag); diff --git a/packages/yew/src/virtual_dom/vtext.rs b/packages/yew/src/virtual_dom/vtext.rs index c01091140e1..109c7034fc3 100644 --- a/packages/yew/src/virtual_dom/vtext.rs +++ b/packages/yew/src/virtual_dom/vtext.rs @@ -35,19 +35,21 @@ impl PartialEq for VText { #[cfg(feature = "ssr")] mod feat_ssr { + use std::fmt::Write; + use super::*; use crate::html::AnyScope; - use crate::platform::fmt::BufWrite; + use crate::platform::fmt::Writer; impl VText { pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut Writer, _parent_scope: &AnyScope, _hydratable: bool, ) { let s = html_escape::encode_text(&self.text); - w.write(s); + let _ = w.write_str(&*s); } } } diff --git a/tools/Cargo.lock b/tools/Cargo.lock index 41cfacc3335..81381dd22d7 100644 --- a/tools/Cargo.lock +++ b/tools/Cargo.lock @@ -2109,6 +2109,7 @@ dependencies = [ "js-sys", "num_cpus", "once_cell", + "pin-project", "serde", "slab", "thiserror", diff --git a/tools/Cargo.toml b/tools/Cargo.toml index bdd4b8f58f3..eacd124ce0a 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -12,3 +12,9 @@ lto = true codegen-units = 1 panic = "abort" opt-level = 3 + +[profile.release] +lto = true +codegen-units = 1 +panic = "abort" +opt-level = 3 From 8688045d5f91f584a012a8f6fb4d75d887f9e3dd Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Fri, 12 Aug 2022 01:34:14 +0900 Subject: [PATCH 06/25] Update Buffer Size. --- packages/yew/src/platform/fmt.rs | 8 +++++--- packages/yew/src/server_renderer.rs | 8 +++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/yew/src/platform/fmt.rs b/packages/yew/src/platform/fmt.rs index 47ee3dc5593..72d98d28574 100644 --- a/packages/yew/src/platform/fmt.rs +++ b/packages/yew/src/platform/fmt.rs @@ -10,6 +10,8 @@ use futures::future::{self, FusedFuture, MaybeDone}; use futures::stream::{FusedStream, Stream}; use pin_project::pin_project; +pub(crate) static DEFAULT_BUF_SIZE: usize = 1024; + struct BufStreamInner { buf: String, waker: Option, @@ -25,7 +27,7 @@ impl Write for Writer { let mut inner = self.inner.borrow_mut(); if inner.buf.is_empty() { - inner.buf.reserve(1024); + inner.buf.reserve(DEFAULT_BUF_SIZE); } let result = inner.buf.write_str(s); @@ -41,7 +43,7 @@ impl Write for Writer { let mut inner = self.inner.borrow_mut(); if inner.buf.is_empty() { - inner.buf.reserve(1024); + inner.buf.reserve(DEFAULT_BUF_SIZE); } let result = inner.buf.write_char(c); @@ -57,7 +59,7 @@ impl Write for Writer { let mut inner = self.inner.borrow_mut(); if inner.buf.is_empty() { - inner.buf.reserve(1024); + inner.buf.reserve(DEFAULT_BUF_SIZE); } let result = inner.buf.write_fmt(args); diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index 3dbe25b64ee..43f30666b10 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -3,9 +3,7 @@ use std::fmt; use futures::stream::{self, Stream, StreamExt}; use crate::html::{BaseComponent, Scope}; -use crate::platform::fmt::BufStream; - -static DEFAULT_BUF_SIZE: usize = 8192; +use crate::platform::fmt::{BufStream, DEFAULT_BUF_SIZE}; /// A Yew Server-side Renderer that renders on the current thread. #[cfg(feature = "ssr")] @@ -55,7 +53,7 @@ where /// Sets the capacity of renderer buffer. /// - /// Default: `8192` + /// Default: `1024` pub fn capacity(mut self, capacity: usize) -> Self { self.capacity = capacity; @@ -173,7 +171,7 @@ where /// Sets the capacity of renderer buffer. /// - /// Default: `8192` + /// Default: `1024` pub fn capacity(mut self, capacity: usize) -> Self { self.capacity = capacity; From 3bd9e0a1f7d0335142b93f7c18dbc51c62ba665e Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Fri, 12 Aug 2022 01:50:36 +0900 Subject: [PATCH 07/25] Make Send renderer work. --- packages/yew/src/html/component/lifecycle.rs | 2 +- packages/yew/src/html/component/scope.rs | 3 +- packages/yew/src/server_renderer.rs | 51 +++++++++++--------- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/packages/yew/src/html/component/lifecycle.rs b/packages/yew/src/html/component/lifecycle.rs index 9007d0061eb..458721c4aaa 100644 --- a/packages/yew/src/html/component/lifecycle.rs +++ b/packages/yew/src/html/component/lifecycle.rs @@ -41,7 +41,7 @@ pub(crate) enum ComponentRenderState { #[cfg(feature = "ssr")] Ssr { - sender: Option>, + sender: Option>, }, } diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index b21896391cf..1f85d8dcd30 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -262,12 +262,13 @@ impl Scope { mod feat_ssr { use std::fmt::Write; + use futures::channel::oneshot; + use super::*; use crate::html::component::lifecycle::{ ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; use crate::platform::fmt::Writer; - use crate::platform::pinned::oneshot; use crate::scheduler; use crate::virtual_dom::Collectable; diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index 43f30666b10..fc303d18be5 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -1,9 +1,11 @@ use std::fmt; -use futures::stream::{self, Stream, StreamExt}; +use futures::pin_mut; +use futures::stream::{Stream, StreamExt}; use crate::html::{BaseComponent, Scope}; use crate::platform::fmt::{BufStream, DEFAULT_BUF_SIZE}; +use crate::platform::{run_pinned, spawn_local}; /// A Yew Server-side Renderer that renders on the current thread. #[cfg(feature = "ssr")] @@ -214,27 +216,30 @@ where /// /// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`. pub async fn render_stream(self) -> impl Stream { - // let Self { - // create_props, - // hydratable, - // capacity, - // } = self; - - // run_pinned(move || async move { - // let (tx, rx) = futures::channel::mpsc::unbounded(); - - // let props = create_props(); - // let scope = Scope::::new(None); - - // spawn_local(async move { - // scope - // .render_into_stream(&mut w, props.into(), hydratable) - // .await; - // }); - - // rx - // }) - // .await - stream::pending() + let Self { + create_props, + hydratable, + capacity, + } = self; + + run_pinned(move || async move { + let (tx, rx) = futures::channel::mpsc::unbounded(); + + spawn_local(async move { + let props = create_props(); + let s = LocalServerRenderer::::with_props(props) + .hydratable(hydratable) + .capacity(capacity) + .render_stream(); + pin_mut!(s); + + while let Some(m) = s.next().await { + let _ = tx.unbounded_send(m); + } + }); + + rx + }) + .await } } From ed367602a7811ad0de181680fba1e52d29d341ca Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Fri, 12 Aug 2022 01:51:25 +0900 Subject: [PATCH 08/25] Remove pinned channels. --- packages/yew/src/platform/mod.rs | 1 - packages/yew/src/platform/pinned/mod.rs | 6 - packages/yew/src/platform/pinned/mpsc.rs | 347 -------------------- packages/yew/src/platform/pinned/oneshot.rs | 203 ------------ 4 files changed, 557 deletions(-) delete mode 100644 packages/yew/src/platform/pinned/mod.rs delete mode 100644 packages/yew/src/platform/pinned/mpsc.rs delete mode 100644 packages/yew/src/platform/pinned/oneshot.rs diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index 8d0d44b3151..6a37b15d1bc 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -45,7 +45,6 @@ use std::future::Future; #[cfg(feature = "ssr")] pub(crate) mod fmt; -pub mod pinned; pub mod time; #[cfg(target_arch = "wasm32")] diff --git a/packages/yew/src/platform/pinned/mod.rs b/packages/yew/src/platform/pinned/mod.rs deleted file mode 100644 index 21e1316e084..00000000000 --- a/packages/yew/src/platform/pinned/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Task Synchronisation Primitives for pinned tasks. -//! -//! This module provides task synchronisation for `!Send` futures. - -pub mod mpsc; -pub mod oneshot; diff --git a/packages/yew/src/platform/pinned/mpsc.rs b/packages/yew/src/platform/pinned/mpsc.rs deleted file mode 100644 index 6170840b757..00000000000 --- a/packages/yew/src/platform/pinned/mpsc.rs +++ /dev/null @@ -1,347 +0,0 @@ -//! A multi-producer single-receiver channel. - -use std::collections::VecDeque; -use std::marker::PhantomData; -use std::rc::Rc; -use std::task::{Poll, Waker}; - -use futures::sink::Sink; -use futures::stream::{FusedStream, Stream}; -use thiserror::Error; - -/// Error returned by [`try_next`](UnboundedReceiver::try_next). -#[derive(Error, Debug)] -#[error("queue is empty")] -pub struct TryRecvError { - _marker: PhantomData<()>, -} - -/// Error returned by [`send_now`](UnboundedSender::send_now). -#[derive(Error, Debug)] -#[error("failed to send")] -pub struct SendError { - /// The send value. - pub inner: T, -} - -/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). -#[derive(Error, Debug)] -#[error("failed to send")] -pub struct TrySendError { - _marker: PhantomData<()>, -} - -#[derive(Debug)] -struct Inner { - rx_waker: Option, - closed: bool, - sender_ctr: usize, - items: VecDeque, -} - -impl Inner { - /// Creates a unchecked mutable reference from an immutable reference. - /// - /// SAFETY: You can only use this when: - /// - /// 1. The mutable reference is released at the end of a function call. - /// 2. No parent function has acquired the mutable reference. - /// 3. The caller is not an async function / the mutable reference is released before an await - /// statement. - #[inline] - unsafe fn get_mut_unchecked(&self) -> *mut Self { - self as *const Self as *mut Self - } - - fn close(&mut self) { - self.closed = true; - - if let Some(m) = self.rx_waker.take() { - m.wake(); - } - } -} - -/// The receiver of an unbounded mpsc channel. -#[derive(Debug)] -pub struct UnboundedReceiver { - inner: Rc>, -} - -impl UnboundedReceiver { - /// Try to read the next value from the channel. - /// - /// This function will return: - /// - `Ok(Some(T))` if a value is ready. - /// - `Ok(None)` if the channel has become closed. - /// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. - pub fn try_next(&self) -> std::result::Result, TryRecvError> { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - match (inner.items.pop_front(), inner.closed) { - (Some(m), _) => Ok(Some(m)), - (None, false) => Ok(None), - (None, true) => Err(TryRecvError { - _marker: PhantomData, - }), - } - } -} - -impl Stream for UnboundedReceiver { - type Item = T; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - match (inner.items.pop_front(), inner.closed) { - (Some(m), _) => Poll::Ready(Some(m)), - (None, false) => { - inner.rx_waker = Some(cx.waker().clone()); - Poll::Pending - } - (None, true) => Poll::Ready(None), - } - } -} - -impl FusedStream for UnboundedReceiver { - fn is_terminated(&self) -> bool { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - inner.items.is_empty() && inner.closed - } -} - -impl Drop for UnboundedReceiver { - fn drop(&mut self) { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - inner.close(); - } -} - -/// The sender of an unbounded mpsc channel. -#[derive(Debug)] -pub struct UnboundedSender { - inner: Rc>, -} - -impl UnboundedSender { - /// Sends a value to the unbounded receiver. - pub fn send_now(&self, item: T) -> Result<(), SendError> { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - if inner.closed { - return Err(SendError { inner: item }); - } - - inner.items.push_back(item); - - if let Some(m) = inner.rx_waker.take() { - m.wake(); - } - - Ok(()) - } - - /// Closes the channel. - pub fn close_now(&self) { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - inner.close(); - } -} - -impl Clone for UnboundedSender { - fn clone(&self) -> Self { - { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - inner.sender_ctr += 1; - } - - Self { - inner: self.inner.clone(), - } - } -} - -impl Drop for UnboundedSender { - fn drop(&mut self) { - let sender_ctr = { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - inner.sender_ctr -= 1; - - inner.sender_ctr - }; - - if sender_ctr == 0 { - self.close_now(); - } - } -} - -impl Sink for &'_ UnboundedSender { - type Error = TrySendError; - - fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.send_now(item).map_err(|_| TrySendError { - _marker: PhantomData, - }) - } - - fn poll_ready( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - match inner.closed { - false => Poll::Ready(Ok(())), - true => Poll::Ready(Err(TrySendError { - _marker: PhantomData, - })), - } - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.close_now(); - - Poll::Ready(Ok(())) - } -} - -/// Creates an unbounded channel. -/// -/// # Note -/// -/// This channel has an infinite buffer and can run out of memory if the channel is not actively -/// drained. -pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let inner = Rc::new(Inner { - rx_waker: None, - closed: false, - - sender_ctr: 1, - items: VecDeque::new(), - }); - - ( - UnboundedSender { - inner: inner.clone(), - }, - UnboundedReceiver { inner }, - ) -} - -#[cfg(not(target_arch = "wasm32"))] -#[cfg(feature = "tokio")] -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::sink::SinkExt; - use futures::stream::StreamExt; - use tokio::task::LocalSet; - use tokio::test; - - use super::*; - use crate::platform::spawn_local; - use crate::platform::time::sleep; - - #[test] - async fn mpsc_works() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, mut rx) = unbounded::(); - - spawn_local(async move { - for i in 0..10 { - (&tx).send(i).await.expect("failed to send."); - sleep(Duration::from_millis(1)).await; - } - }); - - for i in 0..10 { - let received = rx.next().await.expect("failed to receive"); - - assert_eq!(i, received); - } - - assert_eq!(rx.next().await, None); - }) - .await; - } - - #[test] - async fn mpsc_drops_receiver() { - let (tx, rx) = unbounded::(); - drop(rx); - - (&tx).send(0).await.expect_err("should fail to send."); - } - - #[test] - async fn mpsc_multi_sender() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, mut rx) = unbounded::(); - - spawn_local(async move { - let tx2 = tx.clone(); - - for i in 0..10 { - if i % 2 == 0 { - (&tx).send(i).await.expect("failed to send."); - } else { - (&tx2).send(i).await.expect("failed to send."); - } - - sleep(Duration::from_millis(1)).await; - } - - drop(tx2); - - for i in 10..20 { - (&tx).send(i).await.expect("failed to send."); - - sleep(Duration::from_millis(1)).await; - } - }); - - for i in 0..20 { - let received = rx.next().await.expect("failed to receive"); - - assert_eq!(i, received); - } - - assert_eq!(rx.next().await, None); - }) - .await; - } - - #[test] - async fn mpsc_drops_sender() { - let (tx, mut rx) = unbounded::(); - drop(tx); - - assert_eq!(rx.next().await, None); - } -} diff --git a/packages/yew/src/platform/pinned/oneshot.rs b/packages/yew/src/platform/pinned/oneshot.rs deleted file mode 100644 index 65bac285c78..00000000000 --- a/packages/yew/src/platform/pinned/oneshot.rs +++ /dev/null @@ -1,203 +0,0 @@ -//! A one-time send - receive channel. - -use std::future::Future; -use std::marker::PhantomData; -use std::rc::Rc; -use std::task::{Poll, Waker}; - -use thiserror::Error; - -/// Error returned by awaiting the [`Receiver`]. -#[derive(Debug, Error)] -#[error("channel has been closed.")] -pub struct RecvError { - _marker: PhantomData<()>, -} - -#[derive(Debug)] -struct Inner { - rx_waker: Option, - closed: bool, - item: Option, -} - -impl Inner { - /// Creates a unchecked mutable reference from a mutable reference. - /// - /// SAFETY: You can only use this when: - /// - /// 1. The mutable reference is released at the end of a function call. - /// 2. No parent function has acquired the mutable reference. - /// 3. The caller is not an async function / the mutable reference is released before an await - /// statement. - #[inline] - unsafe fn get_mut_unchecked(&self) -> *mut Self { - self as *const Self as *mut Self - } -} - -/// The receiver of a oneshot channel. -#[derive(Debug)] -pub struct Receiver { - inner: Rc>, -} - -impl Future for Receiver { - type Output = Result; - - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - // Implementation Note: - // - // It might be neater to use a match pattern here. - // However, this will slow down the polling process by 10%. - - if let Some(m) = inner.item.take() { - return Poll::Ready(Ok(m)); - } - - if inner.closed { - return Poll::Ready(Err(RecvError { - _marker: PhantomData, - })); - } - - inner.rx_waker = Some(cx.waker().clone()); - Poll::Pending - } -} - -impl Drop for Receiver { - fn drop(&mut self) { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - inner.closed = true; - } -} - -/// The sender of a oneshot channel. -#[derive(Debug)] -pub struct Sender { - inner: Rc>, -} - -impl Sender { - /// Send an item to the other side of the channel, consumes the sender. - pub fn send(self, item: T) -> Result<(), T> { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - if inner.closed { - return Err(item); - } - - inner.item = Some(item); - - if let Some(ref m) = inner.rx_waker { - m.wake_by_ref(); - } - - Ok(()) - } -} - -impl Drop for Sender { - fn drop(&mut self) { - let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; - - inner.closed = true; - - if inner.item.is_none() { - if let Some(ref m) = inner.rx_waker { - m.wake_by_ref(); - } - } - } -} - -/// Creates a oneshot channel. -pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(Inner { - rx_waker: None, - closed: false, - item: None, - }); - - ( - Sender { - inner: inner.clone(), - }, - Receiver { inner }, - ) -} - -#[cfg(not(target_arch = "wasm32"))] -#[cfg(feature = "tokio")] -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::time::Duration; - - use tokio::sync::Barrier; - use tokio::task::LocalSet; - use tokio::test; - - use super::*; - use crate::platform::spawn_local; - use crate::platform::time::sleep; - - #[test] - async fn oneshot_works() { - let (tx, rx) = channel(); - - tx.send(0).expect("failed to send."); - - assert_eq!(rx.await.expect("failed to receive."), 0); - } - - #[test] - async fn oneshot_drops_sender() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, rx) = channel::(); - - spawn_local(async move { - sleep(Duration::from_millis(1)).await; - - drop(tx); - }); - rx.await.expect_err("successful to receive."); - }) - .await; - } - - #[test] - async fn oneshot_drops_receiver() { - let local_set = LocalSet::new(); - - local_set - .run_until(async { - let (tx, rx) = channel::(); - - let bar = Arc::new(Barrier::new(2)); - - { - let bar = bar.clone(); - spawn_local(async move { - sleep(Duration::from_millis(1)).await; - - drop(rx); - - bar.wait().await; - }); - } - - bar.wait().await; - - tx.send(0).expect_err("successful to send."); - }) - .await; - } -} From 2799a2201ddf60e2bc1b485496fda618798079d3 Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Fri, 12 Aug 2022 01:54:45 +0900 Subject: [PATCH 09/25] Unified Naming. --- packages/yew/src/html/component/scope.rs | 4 ++-- packages/yew/src/platform/fmt.rs | 12 ++++++------ packages/yew/src/virtual_dom/mod.rs | 6 +++--- packages/yew/src/virtual_dom/vcomp.rs | 8 ++++---- packages/yew/src/virtual_dom/vlist.rs | 4 ++-- packages/yew/src/virtual_dom/vnode.rs | 6 +++--- packages/yew/src/virtual_dom/vsuspense.rs | 4 ++-- packages/yew/src/virtual_dom/vtag.rs | 6 +++--- packages/yew/src/virtual_dom/vtext.rs | 4 ++-- 9 files changed, 27 insertions(+), 27 deletions(-) diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index 1f85d8dcd30..2e97d16ff4f 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -268,14 +268,14 @@ mod feat_ssr { use crate::html::component::lifecycle::{ ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; - use crate::platform::fmt::Writer; + use crate::platform::fmt::BufWriter; use crate::scheduler; use crate::virtual_dom::Collectable; impl Scope { pub(crate) async fn render_into_stream( &self, - w: &mut Writer, + w: &mut BufWriter, props: Rc, hydratable: bool, ) { diff --git a/packages/yew/src/platform/fmt.rs b/packages/yew/src/platform/fmt.rs index 72d98d28574..b4891e3f3f0 100644 --- a/packages/yew/src/platform/fmt.rs +++ b/packages/yew/src/platform/fmt.rs @@ -18,11 +18,11 @@ struct BufStreamInner { done: bool, } -pub(crate) struct Writer { +pub(crate) struct BufWriter { inner: Rc>, } -impl Write for Writer { +impl Write for BufWriter { fn write_str(&mut self, s: &str) -> fmt::Result { let mut inner = self.inner.borrow_mut(); @@ -88,7 +88,7 @@ where { pub fn new(f: C) -> Self where - C: FnOnce(Writer) -> F, + C: FnOnce(BufWriter) -> F, { let inner = { Rc::new(RefCell::new(BufStreamInner { @@ -100,7 +100,7 @@ where let resolver = { let inner = inner.clone(); - let w = Writer { inner }; + let w = BufWriter { inner }; future::maybe_done(f(w)) }; @@ -113,7 +113,7 @@ where pub fn new_with_resolver(f: C) -> (BufStream, impl Future) where - C: FnOnce(Writer) -> F, + C: FnOnce(BufWriter) -> F, { let inner = { Rc::new(RefCell::new(BufStreamInner { @@ -128,7 +128,7 @@ where let w = { let inner = inner.clone(); - Writer { inner } + BufWriter { inner } }; async move { diff --git a/packages/yew/src/virtual_dom/mod.rs b/packages/yew/src/virtual_dom/mod.rs index b49aa108813..795af56f894 100644 --- a/packages/yew/src/virtual_dom/mod.rs +++ b/packages/yew/src/virtual_dom/mod.rs @@ -114,10 +114,10 @@ mod feat_ssr { use std::fmt::Write; use super::*; - use crate::platform::fmt::Writer; + use crate::platform::fmt::BufWriter; impl Collectable { - pub(crate) fn write_open_tag(&self, w: &mut Writer) { + pub(crate) fn write_open_tag(&self, w: &mut BufWriter) { let _ = w.write_str(""); } - pub(crate) fn write_close_tag(&self, w: &mut Writer) { + pub(crate) fn write_close_tag(&self, w: &mut BufWriter) { let _ = w.write_str("