From 5bf1ae83362335a0f25bd449f3c1bf63d412a371 Mon Sep 17 00:00:00 2001 From: Kaede Hoshikawa Date: Thu, 4 Aug 2022 21:52:32 +0900 Subject: [PATCH 01/15] Switch to pinned channels. --- packages/yew/Cargo.toml | 8 +- packages/yew/src/html/component/lifecycle.rs | 2 +- packages/yew/src/html/component/scope.rs | 6 +- packages/yew/src/platform/{io.rs => fmt.rs} | 94 ++++-- packages/yew/src/platform/mod.rs | 4 +- packages/yew/src/platform/pinned/mod.rs | 6 + packages/yew/src/platform/pinned/mpsc.rs | 334 +++++++++++++++++++ packages/yew/src/platform/pinned/oneshot.rs | 198 +++++++++++ packages/yew/src/platform/sync/mod.rs | 5 - packages/yew/src/platform/sync/mpsc.rs | 6 - packages/yew/src/server_renderer.rs | 37 +- packages/yew/src/virtual_dom/mod.rs | 6 +- packages/yew/src/virtual_dom/vcomp.rs | 8 +- packages/yew/src/virtual_dom/vlist.rs | 10 +- packages/yew/src/virtual_dom/vnode.rs | 6 +- packages/yew/src/virtual_dom/vsuspense.rs | 4 +- packages/yew/src/virtual_dom/vtag.rs | 6 +- packages/yew/src/virtual_dom/vtext.rs | 4 +- tools/Cargo.lock | 1 - 19 files changed, 660 insertions(+), 85 deletions(-) rename packages/yew/src/platform/{io.rs => fmt.rs} (68%) create mode 100644 packages/yew/src/platform/pinned/mod.rs create mode 100644 packages/yew/src/platform/pinned/mpsc.rs create mode 100644 packages/yew/src/platform/pinned/oneshot.rs delete mode 100644 packages/yew/src/platform/sync/mod.rs delete mode 100644 packages/yew/src/platform/sync/mpsc.rs diff --git a/packages/yew/Cargo.toml b/packages/yew/Cargo.toml index 6742ddbbac0..4a68f11dc46 100644 --- a/packages/yew/Cargo.toml +++ b/packages/yew/Cargo.toml @@ -31,8 +31,6 @@ implicit-clone = { version = "0.3", features = ["map"] } base64ct = { version = "1.5.0", features = ["std"], optional = true } bincode = { version = "1.3.3", optional = true } serde = { version = "1", features = ["derive"] } -tokio = { version = "1.19", features = ["sync"] } -tokio-stream = { version = "0.1.9", features = ["sync"] } [dependencies.web-sys] version = "0.3" @@ -75,8 +73,10 @@ wasm-bindgen-futures = "0.4" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] num_cpus = { version = "1.13", optional = true } -tokio-util = { version = "0.7", features = ["rt"], optional = true } once_cell = "1" +tokio = { version = "1.19", features = ["rt", "time"], optional = true } +tokio-stream = { version = "0.1", features = ["time"], optional = true } +tokio-util = { version = "0.7", features = ["rt"], optional = true } [dev-dependencies] wasm-bindgen-test = "0.3" @@ -93,7 +93,7 @@ features = [ ] [features] -tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"] +tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"] ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"] csr = [] hydration = ["csr", "dep:bincode"] diff --git a/packages/yew/src/html/component/lifecycle.rs b/packages/yew/src/html/component/lifecycle.rs index 89fe2022a04..9007d0061eb 100644 --- a/packages/yew/src/html/component/lifecycle.rs +++ b/packages/yew/src/html/component/lifecycle.rs @@ -41,7 +41,7 @@ pub(crate) enum ComponentRenderState { #[cfg(feature = "ssr")] Ssr { - sender: Option>, + sender: Option>, }, } diff --git a/packages/yew/src/html/component/scope.rs b/packages/yew/src/html/component/scope.rs index bf821025cfc..4c627725cb9 100644 --- a/packages/yew/src/html/component/scope.rs +++ b/packages/yew/src/html/component/scope.rs @@ -264,15 +264,15 @@ mod feat_ssr { use crate::html::component::lifecycle::{ ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner, }; - use crate::platform::io::BufWriter; - use crate::platform::sync::oneshot; + use crate::platform::fmt::BufWrite; + use crate::platform::pinned::oneshot; use crate::scheduler; use crate::virtual_dom::Collectable; impl Scope { pub(crate) async fn render_into_stream( &self, - w: &mut BufWriter, + w: &mut dyn BufWrite, props: Rc, hydratable: bool, ) { diff --git a/packages/yew/src/platform/io.rs b/packages/yew/src/platform/fmt.rs similarity index 68% rename from packages/yew/src/platform/io.rs rename to packages/yew/src/platform/fmt.rs index 14e0e11d56e..72d9775655c 100644 --- a/packages/yew/src/platform/io.rs +++ b/packages/yew/src/platform/fmt.rs @@ -5,31 +5,40 @@ use std::borrow::Cow; -use futures::stream::Stream; - -use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender}; +use crate::platform::pinned; // Same as std::io::BufWriter and futures::io::BufWriter. pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024; -/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream. -pub(crate) struct BufWriter { - buf: String, - tx: UnboundedSender, - capacity: usize, +pub(crate) trait BufSend { + fn buf_send(&self, item: String); } -/// Creates a Buffer pair. -pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream) { - let (tx, rx) = mpsc::unbounded_channel::(); +impl BufSend for pinned::mpsc::UnboundedSender { + fn buf_send(&self, item: String) { + let _ = self.send_now(item); + } +} - let tx = BufWriter { - buf: String::with_capacity(capacity), - tx, - capacity, - }; +impl BufSend for futures::channel::mpsc::UnboundedSender { + fn buf_send(&self, item: String) { + let _ = self.unbounded_send(item); + } +} - (tx, UnboundedReceiverStream::new(rx)) +pub trait BufWrite { + fn capacity(&self) -> usize; + fn write(&mut self, s: Cow<'_, str>); +} + +/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream. +pub(crate) struct BufWriter +where + S: BufSend, +{ + buf: String, + tx: S, + capacity: usize, } // Implementation Notes: @@ -53,15 +62,30 @@ pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream) // 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled. // Using a stream avoids this side effect and allows the renderer to finish rendering // without being actively polled. -impl BufWriter { - #[inline] - pub fn capacity(&self) -> usize { - self.capacity +impl BufWriter +where + S: BufSend, +{ + pub fn new(tx: S, capacity: usize) -> Self { + Self { + buf: String::new(), + tx, + capacity, + } } + #[inline] fn drain(&mut self) { - let _ = self.tx.send(self.buf.drain(..).collect()); - self.buf.reserve(self.capacity); + if !self.buf.is_empty() { + self.tx.buf_send(self.buf.split_off(0)); + } + } + + #[inline] + fn reserve(&mut self) { + if self.buf.is_empty() { + self.buf.reserve(self.capacity); + } } /// Returns `True` if the internal buffer has capacity to fit a string of certain length. @@ -69,9 +93,22 @@ impl BufWriter { fn has_capacity_of(&self, next_part_len: usize) -> bool { self.buf.capacity() >= self.buf.len() + next_part_len } +} + +impl BufWrite for BufWriter +where + S: BufSend, +{ + #[inline] + fn capacity(&self) -> usize { + self.capacity + } /// Writes a string into the buffer, optionally drains the buffer. - pub fn write(&mut self, s: Cow<'_, str>) { + fn write(&mut self, s: Cow<'_, str>) { + // Try to reserve the capacity first. + self.reserve(); + if !self.has_capacity_of(s.len()) { // There isn't enough capacity, we drain the buffer. self.drain(); @@ -87,17 +124,20 @@ impl BufWriter { // changes if the buffer was drained. If the buffer capacity didn't change, // then it means self.has_capacity_of() has returned true the first time which will be // guaranteed to be matched by the left hand side of this implementation. - let _ = self.tx.send(s.into_owned()); + self.tx.buf_send(s.into_owned()); } } } -impl Drop for BufWriter { +impl Drop for BufWriter +where + S: BufSend, +{ fn drop(&mut self) { if !self.buf.is_empty() { let mut buf = String::new(); std::mem::swap(&mut buf, &mut self.buf); - let _ = self.tx.send(buf); + self.tx.buf_send(buf); } } } diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index bbab2211b03..8d0d44b3151 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -43,9 +43,9 @@ use std::future::Future; #[cfg(feature = "ssr")] -pub(crate) mod io; +pub(crate) mod fmt; -pub mod sync; +pub mod pinned; pub mod time; #[cfg(target_arch = "wasm32")] diff --git a/packages/yew/src/platform/pinned/mod.rs b/packages/yew/src/platform/pinned/mod.rs new file mode 100644 index 00000000000..21e1316e084 --- /dev/null +++ b/packages/yew/src/platform/pinned/mod.rs @@ -0,0 +1,6 @@ +//! Task Synchronisation Primitives for pinned tasks. +//! +//! This module provides task synchronisation for `!Send` futures. + +pub mod mpsc; +pub mod oneshot; diff --git a/packages/yew/src/platform/pinned/mpsc.rs b/packages/yew/src/platform/pinned/mpsc.rs new file mode 100644 index 00000000000..a4af40abed9 --- /dev/null +++ b/packages/yew/src/platform/pinned/mpsc.rs @@ -0,0 +1,334 @@ +//! A multi-producer single-receiver channel. + +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use futures::sink::Sink; +use futures::stream::{FusedStream, Stream}; +use thiserror::Error; + +/// Error returned by [`try_next`](UnboundedReceiver::try_next). +#[derive(Error, Debug)] +#[error("queue is empty")] +pub struct TryRecvError { + _marker: PhantomData<()>, +} + +/// Error returned by [`send_now`](UnboundedSender::send_now). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct SendError { + /// The send value. + pub inner: T, +} + +/// Error returned by [`UnboundedSender`] when used as a [`Sink`](futures::sink::Sink). +#[derive(Error, Debug)] +#[error("failed to send")] +pub struct TrySendError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + sender_ctr: usize, + items: VecDeque, +} + +impl Inner { + /// Creates a unchecked mutable reference from an immutable reference. + /// + /// SAFETY: You can only use this when: + /// + /// 1. The mutable reference is released at the end of a function call. + /// 2. No parent function has acquired the mutable reference. + /// 3. The caller is not an async function / the mutable reference is released before an await + /// statement. + #[inline] + unsafe fn get_mut_unchecked(&self) -> *mut Self { + self as *const Self as *mut Self + } + + fn close(&mut self) { + self.closed = true; + + if let Some(m) = self.rx_waker.take() { + m.wake(); + } + } +} + +/// The receiver of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedReceiver { + inner: Rc>, +} + +impl UnboundedReceiver { + /// Try to read the next value from the channel. + /// + /// This function will return: + /// - `Ok(Some(T))` if a value is ready. + /// - `Ok(None)` if the channel has become closed. + /// - `Err(TryRecvError)` if the channel is not closed and the channel is empty. + pub fn try_next(&self) -> std::result::Result, TryRecvError> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Ok(Some(m)), + (None, false) => Ok(None), + (None, true) => Err(TryRecvError { + _marker: PhantomData, + }), + } + } +} + +impl Stream for UnboundedReceiver { + type Item = T; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match (inner.items.pop_front(), inner.closed) { + (Some(m), _) => Poll::Ready(Some(m)), + (None, false) => { + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } + (None, true) => Poll::Ready(None), + } + } +} + +impl FusedStream for UnboundedReceiver { + fn is_terminated(&self) -> bool { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.items.is_empty() && inner.closed + } +} + +impl Drop for UnboundedReceiver { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.close(); + } +} + +/// The sender of an unbounded mpsc channel. +#[derive(Debug)] +pub struct UnboundedSender { + inner: Rc>, +} + +impl UnboundedSender { + /// Sends a value to the unbounded receiver. + pub fn send_now(&self, item: T) -> Result<(), SendError> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + if inner.closed { + return Err(SendError { inner: item }); + } + + inner.items.push_back(item); + + if let Some(m) = inner.rx_waker.take() { + m.wake(); + } + + Ok(()) + } + + /// Closes the channel. + pub fn close_now(&self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.close(); + } +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + inner.sender_ctr += 1; + } + + Self { + inner: self.inner.clone(), + } + } +} + +impl Drop for UnboundedSender { + fn drop(&mut self) { + let sender_ctr = { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + inner.sender_ctr -= 1; + + inner.sender_ctr + }; + + if sender_ctr == 0 { + self.close_now(); + } + } +} + +impl Sink for &'_ UnboundedSender { + type Error = TrySendError; + + fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.send_now(item).map_err(|_| TrySendError { + _marker: PhantomData, + }) + } + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + match inner.closed { + false => Poll::Ready(Ok(())), + true => Poll::Ready(Err(TrySendError { + _marker: PhantomData, + })), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.close_now(); + + Poll::Ready(Ok(())) + } +} + +/// Creates an unbounded channel. +/// +/// # Note +/// +/// This channel has an infinite buffer and can run out of memory if the channel is not actively +/// drained. +pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + let inner = Rc::new(Inner { + rx_waker: None, + closed: false, + + sender_ctr: 1, + items: VecDeque::new(), + }); + + ( + UnboundedSender { + inner: inner.clone(), + }, + UnboundedReceiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::sink::SinkExt; + use futures::stream::StreamExt; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn mpsc_works() { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + for i in 0..10 { + (&tx).send(i).await.expect("failed to send."); + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..10 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + } + + #[test] + async fn mpsc_drops_receiver() { + let (tx, rx) = unbounded::(); + drop(rx); + + (&tx).send(0).await.expect_err("should fail to send."); + } + + #[test] + async fn mpsc_multi_sender() { + let (tx, mut rx) = unbounded::(); + + spawn_local(async move { + let tx2 = tx.clone(); + + for i in 0..10 { + if i % 2 == 0 { + (&tx).send(i).await.expect("failed to send."); + } else { + (&tx2).send(i).await.expect("failed to send."); + } + + sleep(Duration::from_millis(1)).await; + } + + drop(tx2); + + for i in 10..20 { + (&tx).send(i).await.expect("failed to send."); + + sleep(Duration::from_millis(1)).await; + } + }); + + for i in 0..20 { + let received = rx.next().await.expect("failed to receive"); + + assert_eq!(i, received); + } + + assert_eq!(rx.next().await, None); + } + + #[test] + async fn mpsc_drops_sender() { + let (tx, mut rx) = unbounded::(); + drop(tx); + + assert_eq!(rx.next().await, None); + } +} diff --git a/packages/yew/src/platform/pinned/oneshot.rs b/packages/yew/src/platform/pinned/oneshot.rs new file mode 100644 index 00000000000..545f024e1b6 --- /dev/null +++ b/packages/yew/src/platform/pinned/oneshot.rs @@ -0,0 +1,198 @@ +//! A one-time send - receive channel. + +use std::future::Future; +use std::marker::PhantomData; +use std::rc::Rc; +use std::task::{Poll, Waker}; + +use thiserror::Error; + +/// Error returned by [`send`](Sender::send). +#[derive(Debug, Error)] +#[error("channel has been closed.")] +pub struct SendError { + /// The inner value. + pub inner: T, +} + +/// Error returned by awaiting the [`Receiver`]. +#[derive(Debug, Error)] +#[error("channel has been closed.")] +pub struct RecvError { + _marker: PhantomData<()>, +} + +#[derive(Debug)] +struct Inner { + rx_waker: Option, + closed: bool, + item: Option, +} + +impl Inner { + /// Creates a unchecked mutable reference from a mutable reference. + /// + /// SAFETY: You can only use this when: + /// + /// 1. The mutable reference is released at the end of a function call. + /// 2. No parent function has acquired the mutable reference. + /// 3. The caller is not an async function / the mutable reference is released before an await + /// statement. + #[inline] + unsafe fn get_mut_unchecked(&self) -> *mut Self { + self as *const Self as *mut Self + } +} + +/// The receiver of a oneshot channel. +#[derive(Debug)] +pub struct Receiver { + inner: Rc>, +} + +impl Future for Receiver { + type Output = Result; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + // Implementation Note: + // + // It might be neater to use a match pattern here. + // However, this will slow down the polling process by 10%. + + if let Some(m) = inner.item.take() { + return Poll::Ready(Ok(m)); + } + + if inner.closed { + return Poll::Ready(Err(RecvError { + _marker: PhantomData, + })); + } + + inner.rx_waker = Some(cx.waker().clone()); + Poll::Pending + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.closed = true; + } +} + +/// The sender of a oneshot channel. +#[derive(Debug)] +pub struct Sender { + inner: Rc>, +} + +impl Sender { + /// Send an item to the other side of the channel, consumes the sender. + pub fn send(self, item: T) -> Result<(), T> { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + if inner.closed { + return Err(item); + } + + inner.item = Some(item); + + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + + Ok(()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let inner = unsafe { &mut *self.inner.get_mut_unchecked() }; + + inner.closed = true; + + if inner.item.is_none() { + if let Some(ref m) = inner.rx_waker { + m.wake_by_ref(); + } + } + } +} + +/// Creates a oneshot channel. +pub fn channel() -> (Sender, Receiver) { + let inner = Rc::new(Inner { + rx_waker: None, + closed: false, + item: None, + }); + + ( + Sender { + inner: inner.clone(), + }, + Receiver { inner }, + ) +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "tokio")] +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tokio::sync::Barrier; + use tokio::test; + + use super::*; + use crate::platform::spawn_local; + use crate::platform::time::sleep; + + #[test] + async fn oneshot_works() { + let (tx, rx) = channel(); + + tx.send(0).expect("failed to send."); + + assert_eq!(rx.await.expect("failed to receive."), 0); + } + + #[test] + async fn oneshot_drops_sender() { + let (tx, rx) = channel::(); + + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(tx); + }); + rx.await.expect_err("successful to receive."); + } + + #[test] + async fn oneshot_drops_receiver() { + let (tx, rx) = channel::(); + + let bar = Arc::new(Barrier::new(2)); + + { + let bar = bar.clone(); + spawn_local(async move { + sleep(Duration::from_millis(1)).await; + + drop(rx); + + bar.wait().await; + }); + } + + bar.wait().await; + + tx.send(0).expect_err("successful to send."); + } +} diff --git a/packages/yew/src/platform/sync/mod.rs b/packages/yew/src/platform/sync/mod.rs deleted file mode 100644 index 63c99dec41a..00000000000 --- a/packages/yew/src/platform/sync/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! A module that provides task synchronisation primitives. - -#[doc(inline)] -pub use tokio::sync::oneshot; -pub mod mpsc; diff --git a/packages/yew/src/platform/sync/mpsc.rs b/packages/yew/src/platform/sync/mpsc.rs deleted file mode 100644 index de09d342bc9..00000000000 --- a/packages/yew/src/platform/sync/mpsc.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! A multi-producer, single-receiver channel. - -#[doc(inline)] -pub use tokio::sync::mpsc::*; -#[doc(inline)] -pub use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index d31dd80aa7e..1bc62b1e882 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -3,7 +3,8 @@ use std::fmt; use futures::stream::{Stream, StreamExt}; use crate::html::{BaseComponent, Scope}; -use crate::platform::io::{self, DEFAULT_BUF_SIZE}; +use crate::platform::fmt::{BufWriter, DEFAULT_BUF_SIZE}; +use crate::platform::pinned::mpsc; use crate::platform::{run_pinned, spawn_local}; /// A Yew Server-side Renderer that renders on the current thread. @@ -93,7 +94,8 @@ where /// Renders Yew Applications into a string Stream pub fn render_stream(self) -> impl Stream { - let (mut w, r) = io::buffer(self.capacity); + let (tx, rx) = mpsc::unbounded(); + let mut w = BufWriter::new(tx, self.capacity); let scope = Scope::::new(None); spawn_local(async move { @@ -102,7 +104,7 @@ where .await; }); - r + rx } } @@ -218,21 +220,26 @@ where /// /// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`. pub async fn render_stream(self) -> impl Stream { - // We use run_pinned to switch to our runtime. - run_pinned(move || async move { - let Self { - create_props, - hydratable, - capacity, - } = self; + let Self { + create_props, + hydratable, + capacity, + } = self; + + let (tx, rx) = futures::channel::mpsc::unbounded(); + run_pinned(move || async move { let props = create_props(); + let scope = Scope::::new(None); + + let mut w = BufWriter::new(tx, capacity); - LocalServerRenderer::::with_props(props) - .hydratable(hydratable) - .capacity(capacity) - .render_stream() + scope + .render_into_stream(&mut w, props.into(), hydratable) + .await; }) - .await + .await; + + rx } } diff --git a/packages/yew/src/virtual_dom/mod.rs b/packages/yew/src/virtual_dom/mod.rs index e7d9797b18c..99085f73bea 100644 --- a/packages/yew/src/virtual_dom/mod.rs +++ b/packages/yew/src/virtual_dom/mod.rs @@ -112,10 +112,10 @@ pub(crate) use feat_ssr_hydration::*; #[cfg(feature = "ssr")] mod feat_ssr { use super::*; - use crate::platform::io::BufWriter; + use crate::platform::fmt::BufWrite; impl Collectable { - pub(crate) fn write_open_tag(&self, w: &mut BufWriter) { + pub(crate) fn write_open_tag(&self, w: &mut dyn BufWrite) { w.write("".into()); } - pub(crate) fn write_close_tag(&self, w: &mut BufWriter) { + pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) { w.write("".into()); } - pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) { + pub(crate) fn write_close_tag(&self, w: &mut BufWriter) { w.write("