diff --git a/.github/workflows/post-size-cmp.yml b/.github/workflows/post-size-cmp.yml index e1c08f7a2b5..3baba67b990 100644 --- a/.github/workflows/post-size-cmp.yml +++ b/.github/workflows/post-size-cmp.yml @@ -8,7 +8,7 @@ on: - completed jobs: - size-cmp: + post-size-cmp: name: Post Comment on Pull Request runs-on: ubuntu-latest diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 3ca26689420..63e6e5dc422 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -545,6 +545,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "function_memory_game" version = "0.1.0" @@ -1155,6 +1161,27 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +[[package]] +name = "jemalloc-sys" +version = "0.5.1+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2b313609b95939cb0c5a5c6917fb9b7c9394562aa3ef44eb66ffa51736432" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.59" @@ -1948,6 +1975,8 @@ dependencies = [ "env_logger", "function_router", "futures 0.3.21", + "hyper", + "jemallocator", "log", "tokio", "tower", diff --git a/examples/simple_ssr/src/bin/simple_ssr_server.rs b/examples/simple_ssr/src/bin/simple_ssr_server.rs index a6127c68bb9..64451772a1b 100644 --- a/examples/simple_ssr/src/bin/simple_ssr_server.rs +++ b/examples/simple_ssr/src/bin/simple_ssr_server.rs @@ -25,7 +25,7 @@ async fn render( Box::new( stream::once(async move { index_html_before }) - .chain(renderer.render_stream().await) + .chain(renderer.render_stream()) .chain(stream::once(async move { index_html_after })) .map(|m| Result::<_, BoxedError>::Ok(m.into())), ) @@ -54,6 +54,5 @@ async fn main() { let routes = html.or(warp::fs::dir(opts.dir)); println!("You can view the website at: http://localhost:8080/"); - warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; } diff --git a/examples/ssr_router/Cargo.toml b/examples/ssr_router/Cargo.toml index 3ebc4671e97..10104835c37 100644 --- a/examples/ssr_router/Cargo.toml +++ b/examples/ssr_router/Cargo.toml @@ -17,7 +17,7 @@ required-features = ["ssr"] yew = { path = "../../packages/yew" } function_router = { path = "../function_router" } log = "0.4" -futures = "0.3" +futures = { version = "0.3", features = ["std"], default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" @@ -30,7 +30,9 @@ tower = { version = "0.4", features = ["make"] } tower-http = { version = "0.3", features = ["fs"] } env_logger = "0.9" clap = { version = "3.1.7", features = ["derive"] } +hyper = { version = "0.14", features = ["server", "http1"] } +jemallocator = "0.5" [features] -ssr = ["yew/ssr"] +ssr = ["yew/ssr", "yew/tokio"] hydration = ["yew/hydration"] diff --git a/examples/ssr_router/src/bin/ssr_router_server.rs b/examples/ssr_router/src/bin/ssr_router_server.rs index 5aa8c1c86ea..d594b15efea 100644 --- a/examples/ssr_router/src/bin/ssr_router_server.rs +++ b/examples/ssr_router/src/bin/ssr_router_server.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::convert::Infallible; +use std::future::Future; use std::path::PathBuf; use axum::body::{Body, StreamBody}; @@ -13,8 +14,14 @@ use axum::{Extension, Router}; use clap::Parser; use function_router::{ServerApp, ServerAppProps}; use futures::stream::{self, StreamExt}; +use hyper::server::Server; use tower::ServiceExt; use tower_http::services::ServeDir; +use yew::platform::Runtime; + +// We use jemalloc as it produces better performance. +#[global_allocator] +static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; /// A basic example #[derive(Parser, Debug)] @@ -38,14 +45,38 @@ async fn render( StreamBody::new( stream::once(async move { index_html_before }) - .chain(renderer.render_stream().await) + .chain(renderer.render_stream()) .chain(stream::once(async move { index_html_after })) .map(Result::<_, Infallible>::Ok), ) } +// An executor to process requests on the Yew runtime. +// +// By spawning requests on the Yew runtime, +// it processes request on the same thread as the rendering task. +// +// This increases performance in some environments (e.g.: in VM). +#[derive(Clone, Default)] +struct Executor { + inner: Runtime, +} + +impl hyper::rt::Executor for Executor +where + F: Future + Send + 'static, +{ + fn execute(&self, fut: F) { + self.inner.spawn_pinned(move || async move { + fut.await; + }); + } +} + #[tokio::main] async fn main() { + let exec = Executor::default(); + env_logger::init(); let opts = Opt::parse(); @@ -86,7 +117,8 @@ async fn main() { println!("You can view the website at: http://localhost:8080/"); - axum::Server::bind(&"0.0.0.0:8080".parse().unwrap()) + Server::bind(&"127.0.0.1:8080".parse().unwrap()) + .executor(exec) .serve(app.into_make_service()) .await .unwrap(); diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index 2fea187d2ee..4ebdb57c101 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -41,6 +41,8 @@ //! `tokio`'s timer, IO and task synchronisation primitives. use std::future::Future; +use std::io::Result; +use std::marker::PhantomData; #[cfg(feature = "ssr")] pub(crate) mod io; @@ -72,21 +74,118 @@ where imp::spawn_local(f); } -/// Runs a task with it pinned onto a local worker thread. -/// -/// This can be used to execute non-Send futures without blocking the current thread. -/// -/// It maintains an internal thread pool dedicated to executing local futures. +/// A Runtime Builder. +#[derive(Debug)] +pub struct RuntimeBuilder { + worker_threads: usize, +} + +impl Default for RuntimeBuilder { + fn default() -> Self { + Self { + worker_threads: imp::get_default_runtime_size(), + } + } +} + +impl RuntimeBuilder { + /// Creates a new Runtime Builder. + pub fn new() -> Self { + Self::default() + } + + /// Sets the number of worker threads the Runtime will use. + /// + /// # Default + /// + /// The default number of worker threads is the number of available logical CPU cores. + /// + /// # Note + /// + /// This setting has no effect if current platform has no thread support (e.g.: WebAssembly). + pub fn worker_threads(&mut self, val: usize) -> &mut Self { + self.worker_threads = val; + + self + } + + /// Creates a Runtime. + pub fn build(&mut self) -> Result { + Ok(Runtime { + inner: imp::Runtime::new(self.worker_threads)?, + }) + } +} + +/// The Yew Runtime. +#[derive(Debug, Clone, Default)] +pub struct Runtime { + inner: imp::Runtime, +} + +impl Runtime { + /// Creates a runtime Builder. + pub fn builder() -> RuntimeBuilder { + RuntimeBuilder::new() + } + + /// Spawns a task with it pinned to a worker thread. + /// + /// This can be used to execute non-Send futures without blocking the current thread. + /// + /// [`spawn_local`] is available with tasks executed with `spawn_pinned`. + pub fn spawn_pinned(&self, create_task: F) + where + F: FnOnce() -> Fut, + F: Send + 'static, + Fut: Future + 'static, + { + self.inner.spawn_pinned(create_task); + } +} + +/// A Local Runtime Handle. /// -/// [`spawn_local`] is available with tasks executed with `run_pinned`. -#[inline(always)] -#[cfg(feature = "ssr")] -pub(crate) async fn run_pinned(create_task: F) -> Fut::Output -where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - Fut::Output: Send + 'static, -{ - imp::run_pinned(create_task).await +/// This type can be used to acquire a runtime handle to spawn local tasks. +#[derive(Debug, Clone)] +pub struct LocalHandle { + inner: imp::LocalHandle, + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + /// Creates a Handle to current Runtime worker. + /// + /// # Panics + /// + /// This method will panic if not called within Yew Runtime. + pub fn current() -> Self { + let inner = imp::LocalHandle::current(); + + Self { + inner, + _marker: PhantomData, + } + } + + /// Creates a Handle to current Runtime worker. + /// + /// This methods will return `None` if called from outside Yew Runtime. + pub fn try_current() -> Option { + let inner = imp::LocalHandle::try_current()?; + + Some(Self { + inner, + _marker: PhantomData, + }) + } + + /// Spawns a Future with current Runtime worker. + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + self.inner.spawn_local(f); + } } diff --git a/packages/yew/src/platform/rt_none/mod.rs b/packages/yew/src/platform/rt_none/mod.rs index fa7c1fde384..2d2fc558627 100644 --- a/packages/yew/src/platform/rt_none/mod.rs +++ b/packages/yew/src/platform/rt_none/mod.rs @@ -1,7 +1,13 @@ use std::future::Future; +use std::io; +use std::marker::PhantomData; pub(crate) mod time; +pub(crate) fn get_default_runtime_size() -> usize { + 0 +} + static NO_RUNTIME_NOTICE: &str = r#"No runtime configured for this platform, \ features that requires a runtime can't be used. \ Either compile with `target_arch = "wasm32", or enable the `tokio` feature."#; @@ -18,13 +24,49 @@ where panic_no_runtime(); } -#[cfg(feature = "ssr")] -pub(crate) async fn run_pinned(_create_task: F) -> Fut::Output -where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - Fut::Output: Send + 'static, -{ - panic_no_runtime(); +#[derive(Debug, Clone)] +pub(crate) struct Runtime {} + +impl Default for Runtime { + fn default() -> Self { + panic_no_runtime(); + } +} + +impl Runtime { + pub fn new(_size: usize) -> io::Result { + panic_no_runtime(); + } + + pub fn spawn_pinned(&self, _create_task: F) + where + F: FnOnce() -> Fut, + F: Send + 'static, + Fut: Future + 'static, + { + panic_no_runtime(); + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + pub fn try_current() -> Option { + panic_no_runtime(); + } + + pub fn current() -> Self { + panic_no_runtime(); + } + + pub fn spawn_local(&self, _f: F) + where + F: Future + 'static, + { + panic_no_runtime(); + } } diff --git a/packages/yew/src/platform/rt_tokio/local_worker.rs b/packages/yew/src/platform/rt_tokio/local_worker.rs new file mode 100644 index 00000000000..90e561845e9 --- /dev/null +++ b/packages/yew/src/platform/rt_tokio/local_worker.rs @@ -0,0 +1,205 @@ +//! We use a local worker implementation that does not produce a JoinHandle for spawn_pinned. +//! This avoids the cost to acquire a JoinHandle. +//! +//! See: [tokio-rs/tokio#4819](https://github.com/tokio-rs/tokio/issues/4819) +//! +//! We will not be able to produce a meaningful JoinHandle until WebAssembly targets support +//! unwinding. + +use std::cell::RefCell; +use std::future::Future; +use std::marker::PhantomData; +use std::sync::Arc; +use std::{io, thread}; + +static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker"; + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::channel::mpsc::UnboundedSender; +use futures::stream::StreamExt; +use tokio::task::{spawn_local, LocalSet}; + +type SpawnTask = Box; + +thread_local! { + static TASK_COUNT: RefCell>> = RefCell::new(None); + static LOCAL_SET: LocalSet = LocalSet::new(); +} + +pub(crate) struct LocalWorker { + task_count: Arc, + tx: UnboundedSender, +} + +impl LocalWorker { + pub fn new() -> io::Result { + let (tx, mut rx) = futures::channel::mpsc::unbounded::(); + + let task_count: Arc = Arc::default(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + { + let task_count = task_count.clone(); + thread::Builder::new() + .name(DEFAULT_WORKER_NAME.into()) + .spawn(move || { + TASK_COUNT.with(move |m| { + *m.borrow_mut() = Some(task_count); + }); + + LOCAL_SET.with(|local_set| { + local_set.block_on(&rt, async move { + while let Some(m) = rx.next().await { + m(); + } + }); + }); + })?; + } + + Ok(Self { task_count, tx }) + } + + pub fn task_count(&self) -> usize { + self.task_count.load(Ordering::Acquire) + } + + pub fn spawn_pinned(&self, f: F) + where + F: 'static + Send + FnOnce() -> Fut, + Fut: 'static + Future, + { + let guard = LocalJobCountGuard::new(self.task_count.clone()); + + // We ignore the result upon a failure, this can never happen unless the runtime is + // exiting which all instances of Runtime will be dropped at that time and hence cannot + // spawn pinned tasks. + let _ = self.tx.unbounded_send(Box::new(move || { + spawn_local(async move { + let _guard = guard; + + f().await; + }); + })); + } +} + +pub struct LocalJobCountGuard(Arc); + +impl LocalJobCountGuard { + fn new(inner: Arc) -> Self { + inner.fetch_add(1, Ordering::AcqRel); + LocalJobCountGuard(inner) + } +} + +impl Drop for LocalJobCountGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::AcqRel); + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, + task_count: Arc, +} + +impl LocalHandle { + pub fn try_current() -> Option { + // We cache the handle to prevent borrowing RefCell. + thread_local! { + static LOCAL_HANDLE: Option = TASK_COUNT + .with(|m| m.borrow().clone()) + .map(|task_count| LocalHandle { task_count, _marker: PhantomData }); + } + + LOCAL_HANDLE.with(|m| m.clone()) + } + + pub fn current() -> Self { + Self::try_current().expect("outside of Yew runtime.") + } + + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + let guard = LocalJobCountGuard::new(self.task_count.clone()); + + LOCAL_SET.with(move |local_set| { + local_set.spawn_local(async move { + let _guard = guard; + + f.await; + }) + }); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::channel::oneshot; + use tokio::test; + use tokio::time::timeout; + use yew::platform::Runtime; + + use super::*; + + #[test] + async fn test_local_handle_exists() { + assert!(LocalHandle::try_current().is_none()); + + let runtime = Runtime::default(); + let (tx, rx) = oneshot::channel(); + + runtime.spawn_pinned(move || async move { + tx.send(LocalHandle::try_current().is_some()) + .expect("failed to send"); + }); + + timeout(Duration::from_secs(5), rx) + .await + .expect("task timed out") + .expect("failed to receive"); + } + + #[test] + async fn test_local_handle_spawns_on_same_worker() { + assert!(LocalHandle::try_current().is_none()); + + let runtime = Runtime::default(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + runtime.spawn_pinned(move || async move { + let handle = LocalHandle::current(); + + tx1.send(std::thread::current().id()) + .expect("failed to send"); + + handle.spawn_local(async move { + tx2.send(std::thread::current().id()) + .expect("failed to send"); + }) + }); + + let result1 = timeout(Duration::from_secs(5), rx1) + .await + .expect("task timed out") + .expect("failed to receive"); + let result2 = timeout(Duration::from_secs(5), rx2) + .await + .expect("task timed out") + .expect("failed to receive"); + + assert_eq!(result1, result2); + } +} diff --git a/packages/yew/src/platform/rt_tokio/mod.rs b/packages/yew/src/platform/rt_tokio/mod.rs index e337676ecff..6be2c92b0aa 100644 --- a/packages/yew/src/platform/rt_tokio/mod.rs +++ b/packages/yew/src/platform/rt_tokio/mod.rs @@ -1,25 +1,20 @@ use std::future::Future; +use std::sync::Arc; +use std::{fmt, io}; + +use once_cell::sync::Lazy; pub(crate) mod time; -#[cfg(feature = "ssr")] -pub(super) async fn run_pinned(create_task: F) -> Fut::Output -where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - Fut::Output: Send + 'static, -{ - use once_cell::sync::Lazy; - use tokio_util::task::LocalPoolHandle; +mod local_worker; - static POOL_HANDLE: Lazy = - Lazy::new(|| LocalPoolHandle::new(num_cpus::get() * 2)); +pub(crate) use local_worker::LocalHandle; +use local_worker::LocalWorker; - POOL_HANDLE - .spawn_pinned(create_task) - .await - .expect("future has panicked!") +pub(crate) fn get_default_runtime_size() -> usize { + // We use num_cpus as std::thread::available_parallelism() does not take + // system resource constraint (e.g.: cgroups) into consideration. + num_cpus::get() } #[inline(always)] @@ -27,5 +22,151 @@ pub(super) fn spawn_local(f: F) where F: Future + 'static, { - tokio::task::spawn_local(f); + match LocalHandle::try_current() { + Some(m) => { + // If within a Yew runtime, use a local handle increases the local task count. + m.spawn_local(f); + } + None => { + tokio::task::spawn_local(f); + } + } +} + +#[derive(Clone)] +pub(crate) struct Runtime { + workers: Arc>, +} + +impl fmt::Debug for Runtime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Runtime") + .field("workers", &"Vec") + .finish() + } +} + +impl Default for Runtime { + fn default() -> Self { + static DEFAULT_RT: Lazy = Lazy::new(|| { + Runtime::new(get_default_runtime_size()).expect("failed to create runtime.") + }); + + DEFAULT_RT.clone() + } +} + +impl Runtime { + pub fn new(size: usize) -> io::Result { + assert!(size > 0, "must have more than 1 worker."); + + let mut workers = Vec::with_capacity(size); + + for _ in 0..size { + let worker = LocalWorker::new()?; + workers.push(worker); + } + + Ok(Self { + workers: workers.into(), + }) + } + + fn find_least_busy_local_worker(&self) -> &LocalWorker { + let mut workers = self.workers.iter(); + + let mut worker = workers.next().expect("must have more than 1 worker."); + let mut task_count = worker.task_count(); + + for current_worker in workers { + if task_count == 0 { + // We don't have to search until the end. + break; + } + + let current_worker_task_count = current_worker.task_count(); + + if current_worker_task_count < task_count { + task_count = current_worker_task_count; + worker = current_worker; + } + } + + worker + } + + pub fn spawn_pinned(&self, create_task: F) + where + F: FnOnce() -> Fut, + F: Send + 'static, + Fut: Future + 'static, + { + let worker = self.find_least_busy_local_worker(); + worker.spawn_pinned(create_task); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::channel::oneshot; + use tokio::test; + use tokio::time::timeout; + + use super::*; + + #[test] + async fn test_spawn_pinned_least_busy() { + let runtime = Runtime::new(2).expect("failed to create runtime."); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + runtime.spawn_pinned(move || async move { + tx1.send(std::thread::current().id()) + .expect("failed to send!"); + }); + + runtime.spawn_pinned(move || async move { + tx2.send(std::thread::current().id()) + .expect("failed to send!"); + }); + + let result1 = timeout(Duration::from_secs(5), rx1) + .await + .expect("task timed out") + .expect("failed to receive"); + let result2 = timeout(Duration::from_secs(5), rx2) + .await + .expect("task timed out") + .expect("failed to receive"); + + // first task and second task are not on the same thread. + assert_ne!(result1, result2); + } + + #[test] + async fn test_spawn_local_within_send() { + let runtime = Runtime::new(1).expect("failed to create runtime."); + + let (tx, rx) = oneshot::channel(); + + runtime.spawn_pinned(move || async move { + tokio::task::spawn(async move { + // tokio::task::spawn_local cannot spawn tasks outside of a local context. + // + // yew::platform::spawn_local can spawn tasks within a Send task as long as running + // under a Yew Runtime. + spawn_local(async move { + tx.send(()).expect("failed to send!"); + }) + }); + }); + + timeout(Duration::from_secs(5), rx) + .await + .expect("task timed out") + .expect("failed to receive"); + } } diff --git a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs b/packages/yew/src/platform/rt_wasm_bindgen/mod.rs index c0dde7f6a89..fb594980099 100644 --- a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs +++ b/packages/yew/src/platform/rt_wasm_bindgen/mod.rs @@ -1,17 +1,56 @@ -#[cfg(feature = "ssr")] use std::future::Future; +use std::io; +use std::marker::PhantomData; pub(crate) mod time; +pub(crate) fn get_default_runtime_size() -> usize { + 0 +} + pub(super) use wasm_bindgen_futures::spawn_local; -#[cfg(feature = "ssr")] -pub(crate) async fn run_pinned(create_task: F) -> Fut::Output -where - F: FnOnce() -> Fut, - F: Send + 'static, - Fut: Future + 'static, - Fut::Output: Send + 'static, -{ - create_task().await +#[derive(Debug, Clone, Default)] +pub(crate) struct Runtime {} + +impl Runtime { + pub fn new(_size: usize) -> io::Result { + Ok(Self {}) + } + + pub fn spawn_pinned(&self, create_task: F) + where + F: FnOnce() -> Fut, + F: Send + 'static, + Fut: Future + 'static, + { + spawn_local(create_task()) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + pub fn try_current() -> Option { + Some(Self { + _marker: PhantomData, + }) + } + + pub fn current() -> Self { + Self { + _marker: PhantomData, + } + } + + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + spawn_local(f); + } } diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index de89cc17281..8007ff96a43 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -5,9 +5,17 @@ use tracing::Instrument; use crate::html::{BaseComponent, Scope}; use crate::platform::io::{self, DEFAULT_BUF_SIZE}; -use crate::platform::{run_pinned, spawn_local}; +use crate::platform::{spawn_local, LocalHandle, Runtime}; /// A Yew Server-side Renderer that renders on the current thread. +/// +/// # Note +/// +/// This renderer does not spawn its own runtime and can only be used when: +/// +/// - `wasm-bindgen` is selected as the backend of Yew runtime. +/// - running within a [`Runtime`](crate::platform::Runtime). +/// - running within a tokio [`LocalSet`](tokio::task::LocalSet). #[cfg(feature = "ssr")] #[derive(Debug)] pub struct LocalServerRenderer @@ -92,7 +100,7 @@ where } } - /// Renders Yew Applications into a string Stream + /// Renders Yew Application into a string Stream #[tracing::instrument( level = tracing::Level::DEBUG, name = "render", @@ -131,6 +139,7 @@ where create_props: Box COMP::Properties>, hydratable: bool, capacity: usize, + rt: Option, } impl fmt::Debug for ServerRenderer @@ -181,9 +190,17 @@ where create_props: Box::new(create_props), hydratable: true, capacity: DEFAULT_BUF_SIZE, + rt: None, } } + /// Sets the runtime the ServerRenderer will run the rendering task with. + pub fn with_runtime(mut self, rt: Runtime) -> Self { + self.rt = Some(rt); + + self + } + /// Sets the capacity of renderer buffer. /// /// Default: `8192` @@ -216,34 +233,43 @@ where /// Renders Yew Application to a String. pub async fn render_to_string(self, w: &mut String) { - let mut s = self.render_stream().await; + let mut s = self.render_stream(); while let Some(m) = s.next().await { w.push_str(&m); } } - /// Renders Yew Applications into a string Stream. - /// - /// # Note - /// - /// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`. - pub async fn render_stream(self) -> impl Stream { - // We use run_pinned to switch to our runtime. - run_pinned(move || async move { - let Self { - create_props, - hydratable, - capacity, - } = self; - + /// Renders Yew Application into a string Stream. + pub fn render_stream(self) -> impl Send + Stream { + let Self { + create_props, + hydratable, + capacity, + rt, + } = self; + + let (mut w, r) = io::buffer(capacity); + let create_task = move || async move { let props = create_props(); + let scope = Scope::::new(None); + + scope + .render_into_stream(&mut w, props.into(), hydratable) + .await; + }; + + match rt { + // If a runtime is specified, spawn to the specified runtime. + Some(m) => m.spawn_pinned(create_task), + None => match LocalHandle::try_current() { + // If within a Yew Runtime, spawn to the current runtime. + Some(m) => m.spawn_local(create_task()), + // Outside of Yew Runtime, spawn to the default runtime. + None => Runtime::default().spawn_pinned(create_task), + }, + } - LocalServerRenderer::::with_props(props) - .hydratable(hydratable) - .capacity(capacity) - .render_stream() - }) - .await + r } }