diff --git a/packages/yew/Cargo.toml b/packages/yew/Cargo.toml index 4ec34a3bda0..97939dbc3f4 100644 --- a/packages/yew/Cargo.toml +++ b/packages/yew/Cargo.toml @@ -31,6 +31,8 @@ implicit-clone = { version = "0.3", features = ["map"] } base64ct = { version = "1.5.0", features = ["std"], optional = true } bincode = { version = "1.3.3", optional = true } serde = { version = "1", features = ["derive"] } +tokio = { version = "1.19", features = ["sync"] } +tokio-stream = { version = "0.1.9", features = ["sync"] } tracing = "0.1.36" [dependencies.web-sys] @@ -75,10 +77,8 @@ wasm-bindgen-futures = "0.4" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] num_cpus = { version = "1.13", optional = true } -once_cell = "1" -tokio = { version = "1.19", features = ["rt", "time"], optional = true } -tokio-stream = { version = "0.1", features = ["time"], optional = true } tokio-util = { version = "0.7", features = ["rt"], optional = true } +once_cell = "1" [dev-dependencies] wasm-bindgen-test = "0.3" @@ -95,7 +95,7 @@ features = [ ] [features] -tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"] +tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"] ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"] csr = [] hydration = ["csr", "dep:bincode"] diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index b845af6b582..42fb6e367c9 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -264,7 +264,7 @@ mod feat_ssr { use crate::html::component::lifecycle::{ ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; - use crate::platform::fmt::BufWrite; + use crate::platform::io::BufWriter; use crate::platform::pinned::oneshot; use crate::scheduler; use crate::virtual_dom::Collectable; @@ -272,7 +272,7 @@ mod feat_ssr { impl Scope { pub(crate) async fn render_into_stream( &self, - w: &mut dyn BufWrite, + w: &mut BufWriter, props: Rc, hydratable: bool, ) { diff --git a/packages/yew/src/platform/io.rs b/packages/yew/src/platform/io.rs new file mode 100644 index 00000000000..14e0e11d56e --- /dev/null +++ b/packages/yew/src/platform/io.rs @@ -0,0 +1,103 @@ +//! This module contains types for I/O functionality. + +// This module should remain private until impl trait type alias becomes available so +// `BufReader` can be produced with an existential type. + +use std::borrow::Cow; + +use futures::stream::Stream; + +use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender}; + +// Same as std::io::BufWriter and futures::io::BufWriter. +pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream. +pub(crate) struct BufWriter { + buf: String, + tx: UnboundedSender, + capacity: usize, +} + +/// Creates a Buffer pair. +pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream) { + let (tx, rx) = mpsc::unbounded_channel::(); + + let tx = BufWriter { + buf: String::with_capacity(capacity), + tx, + capacity, + }; + + (tx, UnboundedReceiverStream::new(rx)) +} + +// Implementation Notes: +// +// When jemalloc is used and a reasonable buffer length is chosen, +// performance of this buffer is related to the number of allocations +// instead of the amount of memory that is allocated. +// +// A Bytes-based implementation is also tested, and yielded a similar performance to String-based +// buffer. +// +// Having a String-based buffer avoids unsafe / cost of conversion between String and Bytes +// when text based content is needed (e.g.: post-processing). +// +// `Bytes::from` can be used to convert a `String` to `Bytes` if web server asks for an +// `impl Stream`. This conversion incurs no memory allocation. +// +// Yielding the output with a Stream provides a couple advantages: +// +// 1. All child components of a VList can have their own buffer and be rendered concurrently. +// 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled. +// Using a stream avoids this side effect and allows the renderer to finish rendering +// without being actively polled. +impl BufWriter { + #[inline] + pub fn capacity(&self) -> usize { + self.capacity + } + + fn drain(&mut self) { + let _ = self.tx.send(self.buf.drain(..).collect()); + self.buf.reserve(self.capacity); + } + + /// Returns `True` if the internal buffer has capacity to fit a string of certain length. + #[inline] + fn has_capacity_of(&self, next_part_len: usize) -> bool { + self.buf.capacity() >= self.buf.len() + next_part_len + } + + /// Writes a string into the buffer, optionally drains the buffer. + pub fn write(&mut self, s: Cow<'_, str>) { + if !self.has_capacity_of(s.len()) { + // There isn't enough capacity, we drain the buffer. + self.drain(); + } + + if self.has_capacity_of(s.len()) { + // The next part is going to fit into the buffer, we push it onto the buffer. + self.buf.push_str(&s); + } else { + // if the next part is more than buffer size, we send the next part. + + // We don't need to drain the buffer here as the result of self.has_capacity_of() only + // changes if the buffer was drained. If the buffer capacity didn't change, + // then it means self.has_capacity_of() has returned true the first time which will be + // guaranteed to be matched by the left hand side of this implementation. + let _ = self.tx.send(s.into_owned()); + } + } +} + +impl Drop for BufWriter { + fn drop(&mut self) { + if !self.buf.is_empty() { + let mut buf = String::new(); + std::mem::swap(&mut buf, &mut self.buf); + let _ = self.tx.send(buf); + } + } +} diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index 8d0d44b3151..2fea187d2ee 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -43,9 +43,10 @@ use std::future::Future; #[cfg(feature = "ssr")] -pub(crate) mod fmt; +pub(crate) mod io; pub mod pinned; +pub mod sync; pub mod time; #[cfg(target_arch = "wasm32")] diff --git a/packages/yew/src/platform/sync/mod.rs b/packages/yew/src/platform/sync/mod.rs new file mode 100644 index 00000000000..63c99dec41a --- /dev/null +++ b/packages/yew/src/platform/sync/mod.rs @@ -0,0 +1,5 @@ +//! A module that provides task synchronisation primitives. + +#[doc(inline)] +pub use tokio::sync::oneshot; +pub mod mpsc; diff --git a/packages/yew/src/platform/sync/mpsc.rs b/packages/yew/src/platform/sync/mpsc.rs new file mode 100644 index 00000000000..de09d342bc9 --- /dev/null +++ b/packages/yew/src/platform/sync/mpsc.rs @@ -0,0 +1,6 @@ +//! A multi-producer, single-receiver channel. + +#[doc(inline)] +pub use tokio::sync::mpsc::*; +#[doc(inline)] +pub use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index b138a4f9156..de89cc17281 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -4,8 +4,7 @@ use futures::stream::{Stream, StreamExt}; use tracing::Instrument; use crate::html::{BaseComponent, Scope}; -use crate::platform::fmt::{BufWriter, DEFAULT_BUF_SIZE}; -use crate::platform::pinned::mpsc; +use crate::platform::io::{self, DEFAULT_BUF_SIZE}; use crate::platform::{run_pinned, spawn_local}; /// A Yew Server-side Renderer that renders on the current thread. @@ -101,8 +100,7 @@ where fields(hydratable = self.hydratable, capacity = self.capacity), )] pub fn render_stream(self) -> impl Stream { - let (tx, rx) = mpsc::unbounded(); - let mut w = BufWriter::new(tx, self.capacity); + let (mut w, r) = io::buffer(self.capacity); let scope = Scope::::new(None); let outer_span = tracing::Span::current(); @@ -115,7 +113,7 @@ where .await; }); - rx + r } } @@ -231,27 +229,20 @@ where /// /// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`. pub async fn render_stream(self) -> impl Stream { - let Self { - create_props, - hydratable, - capacity, - } = self; - + // We use run_pinned to switch to our runtime. run_pinned(move || async move { - let (tx, rx) = futures::channel::mpsc::unbounded(); + let Self { + create_props, + hydratable, + capacity, + } = self; let props = create_props(); - let scope = Scope::::new(None); - - let mut w = BufWriter::new(tx, capacity); - - spawn_local(async move { - scope - .render_into_stream(&mut w, props.into(), hydratable) - .await; - }); - rx + LocalServerRenderer::::with_props(props) + .hydratable(hydratable) + .capacity(capacity) + .render_stream() }) .await } diff --git a/packages/yew/src/virtual_dom/mod.rs b/packages/yew/src/virtual_dom/mod.rs index 99085f73bea..e7d9797b18c 100644 --- a/packages/yew/src/virtual_dom/mod.rs +++ b/packages/yew/src/virtual_dom/mod.rs @@ -112,10 +112,10 @@ pub(crate) use feat_ssr_hydration::*; #[cfg(feature = "ssr")] mod feat_ssr { use super::*; - use crate::platform::fmt::BufWrite; + use crate::platform::io::BufWriter; impl Collectable { - pub(crate) fn write_open_tag(&self, w: &mut dyn BufWrite) { + pub(crate) fn write_open_tag(&self, w: &mut BufWriter) { w.write("".into()); } - pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) { + pub(crate) fn write_close_tag(&self, w: &mut BufWriter) { w.write("