diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 3ca26689420..b8dae574ad3 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -545,6 +545,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "function_memory_game" version = "0.1.0" @@ -1155,6 +1161,27 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +[[package]] +name = "jemalloc-sys" +version = "0.5.1+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2b313609b95939cb0c5a5c6917fb9b7c9394562aa3ef44eb66ffa51736432" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.59" @@ -1903,6 +1930,8 @@ dependencies = [ "bytes", "clap", "futures 0.3.21", + "hyper", + "jemallocator", "log", "reqwest", "serde", @@ -1948,6 +1977,8 @@ dependencies = [ "env_logger", "function_router", "futures 0.3.21", + "hyper", + "jemallocator", "log", "tokio", "tower", diff --git a/examples/simple_ssr/Cargo.toml b/examples/simple_ssr/Cargo.toml index 4bb1b3b549e..fcdf5c4c525 100644 --- a/examples/simple_ssr/Cargo.toml +++ b/examples/simple_ssr/Cargo.toml @@ -29,6 +29,8 @@ log = "0.4" tokio = { version = "1.15.0", features = ["full"] } warp = "0.3" clap = { version = "3.1.7", features = ["derive"] } +hyper = { version = "0.14", features = ["server", "http1"] } +jemallocator = "0.5" [features] hydration = ["yew/hydration"] diff --git a/examples/simple_ssr/src/bin/simple_ssr_server.rs b/examples/simple_ssr/src/bin/simple_ssr_server.rs index 7a26d375d4d..2530817162b 100644 --- a/examples/simple_ssr/src/bin/simple_ssr_server.rs +++ b/examples/simple_ssr/src/bin/simple_ssr_server.rs @@ -1,11 +1,15 @@ +use std::convert::Infallible; use std::error::Error; +use std::future::Future; use std::path::PathBuf; use bytes::Bytes; use clap::Parser; use futures::stream::{self, Stream, StreamExt}; +use hyper::server::Server; use simple_ssr::App; use warp::Filter; +use yew::platform::Runtime; type BoxedError = Box; @@ -17,6 +21,23 @@ struct Opt { dir: PathBuf, } +// An executor to process requests on the Yew runtime. +#[derive(Clone, Default)] +struct Executor { + inner: Runtime, +} + +impl hyper::rt::Executor for Executor +where + F: Future + Send + 'static, +{ + fn execute(&self, fut: F) { + self.inner.spawn_pinned(move || async move { + fut.await; + }); + } +} + async fn render( index_html_before: String, index_html_after: String, @@ -34,6 +55,7 @@ async fn render( #[tokio::main] async fn main() { let opts = Opt::parse(); + let exec = Executor::default(); let index_html_s = tokio::fs::read_to_string(opts.dir.join("index.html")) .await @@ -55,5 +77,14 @@ async fn main() { println!("You can view the website at: http://localhost:8080/"); - warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; + let warp_svc = warp::service(routes); + let svc = hyper::service::make_service_fn(move |_| { + let warp_svc = warp_svc.clone(); + async move { Ok::<_, Infallible>(warp_svc) } + }); + Server::bind(&"127.0.0.1:8080".parse().unwrap()) + .executor(exec) + .serve(svc) + .await + .unwrap(); } diff --git a/examples/ssr_router/Cargo.toml b/examples/ssr_router/Cargo.toml index 2cd9fe13ebe..10104835c37 100644 --- a/examples/ssr_router/Cargo.toml +++ b/examples/ssr_router/Cargo.toml @@ -30,7 +30,9 @@ tower = { version = "0.4", features = ["make"] } tower-http = { version = "0.3", features = ["fs"] } env_logger = "0.9" clap = { version = "3.1.7", features = ["derive"] } +hyper = { version = "0.14", features = ["server", "http1"] } +jemallocator = "0.5" [features] -ssr = ["yew/ssr"] +ssr = ["yew/ssr", "yew/tokio"] hydration = ["yew/hydration"] diff --git a/examples/ssr_router/src/bin/ssr_router_server.rs b/examples/ssr_router/src/bin/ssr_router_server.rs index 7ada033234d..cabdfaff077 100644 --- a/examples/ssr_router/src/bin/ssr_router_server.rs +++ b/examples/ssr_router/src/bin/ssr_router_server.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::convert::Infallible; +use std::future::Future; use std::path::PathBuf; use axum::body::{Body, StreamBody}; @@ -13,8 +14,13 @@ use axum::{Extension, Router}; use clap::Parser; use function_router::{ServerApp, ServerAppProps}; use futures::stream::{self, StreamExt}; +use hyper::server::Server; use tower::ServiceExt; use tower_http::services::ServeDir; +use yew::platform::Runtime; + +#[global_allocator] +static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; /// A basic example #[derive(Parser, Debug)] @@ -44,8 +50,27 @@ async fn render( ) } +// An executor to process requests on the Yew runtime. +#[derive(Clone, Default)] +struct Executor { + inner: Runtime, +} + +impl hyper::rt::Executor for Executor +where + F: Future + Send + 'static, +{ + fn execute(&self, fut: F) { + self.inner.spawn_pinned(move || async move { + fut.await; + }); + } +} + #[tokio::main] async fn main() { + let exec = Executor::default(); + env_logger::init(); let opts = Opt::parse(); @@ -86,7 +111,8 @@ async fn main() { println!("You can view the website at: http://localhost:8080/"); - axum::Server::bind(&"0.0.0.0:8080".parse().unwrap()) + Server::bind(&"127.0.0.1:8080".parse().unwrap()) + .executor(exec) .serve(app.into_make_service()) .await .unwrap(); diff --git a/packages/yew/src/platform/mod.rs b/packages/yew/src/platform/mod.rs index 96e35eb8c48..4ebdb57c101 100644 --- a/packages/yew/src/platform/mod.rs +++ b/packages/yew/src/platform/mod.rs @@ -42,6 +42,7 @@ use std::future::Future; use std::io::Result; +use std::marker::PhantomData; #[cfg(feature = "ssr")] pub(crate) mod io; @@ -142,3 +143,49 @@ impl Runtime { self.inner.spawn_pinned(create_task); } } + +/// A Local Runtime Handle. +/// +/// This type can be used to acquire a runtime handle to spawn local tasks. +#[derive(Debug, Clone)] +pub struct LocalHandle { + inner: imp::LocalHandle, + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + /// Creates a Handle to current Runtime worker. + /// + /// # Panics + /// + /// This method will panic if not called within Yew Runtime. + pub fn current() -> Self { + let inner = imp::LocalHandle::current(); + + Self { + inner, + _marker: PhantomData, + } + } + + /// Creates a Handle to current Runtime worker. + /// + /// This methods will return `None` if called from outside Yew Runtime. + pub fn try_current() -> Option { + let inner = imp::LocalHandle::try_current()?; + + Some(Self { + inner, + _marker: PhantomData, + }) + } + + /// Spawns a Future with current Runtime worker. + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + self.inner.spawn_local(f); + } +} diff --git a/packages/yew/src/platform/rt_none/mod.rs b/packages/yew/src/platform/rt_none/mod.rs index b3724b39356..2d2fc558627 100644 --- a/packages/yew/src/platform/rt_none/mod.rs +++ b/packages/yew/src/platform/rt_none/mod.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::io; +use std::marker::PhantomData; pub(crate) mod time; @@ -46,3 +47,26 @@ impl Runtime { panic_no_runtime(); } } + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + pub fn try_current() -> Option { + panic_no_runtime(); + } + + pub fn current() -> Self { + panic_no_runtime(); + } + + pub fn spawn_local(&self, _f: F) + where + F: Future + 'static, + { + panic_no_runtime(); + } +} diff --git a/packages/yew/src/platform/rt_tokio/local_worker.rs b/packages/yew/src/platform/rt_tokio/local_worker.rs new file mode 100644 index 00000000000..0737a4c9e3b --- /dev/null +++ b/packages/yew/src/platform/rt_tokio/local_worker.rs @@ -0,0 +1,143 @@ +//! We use a local worker implementation that does not produce a JoinHandle for spawn_pinned. +//! This avoids the cost to acquire a JoinHandle. +//! +//! See: https://github.com/tokio-rs/tokio/issues/4819 +//! +//! We will not be able to produce a meaningful JoinHandle until WebAssembly targets support +//! unwinding. + +use std::cell::RefCell; +use std::future::Future; +use std::marker::PhantomData; +use std::sync::Arc; +use std::{io, thread}; + +static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker"; + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::channel::mpsc::UnboundedSender; +use futures::stream::StreamExt; +use tokio::task::{spawn_local, LocalSet}; + +type SpawnTask = Box; + +thread_local! { + static TASK_COUNT: RefCell>> = RefCell::new(None); + static LOCAL_SET: LocalSet = LocalSet::new(); +} + +pub(crate) struct LocalWorker { + task_count: Arc, + tx: UnboundedSender, +} + +impl LocalWorker { + pub fn new() -> io::Result { + let (tx, mut rx) = futures::channel::mpsc::unbounded::(); + + let task_count: Arc = Arc::default(); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + { + let task_count = task_count.clone(); + thread::Builder::new() + .name(DEFAULT_WORKER_NAME.into()) + .spawn(move || { + TASK_COUNT.with(move |m| { + *m.borrow_mut() = Some(task_count); + }); + + LOCAL_SET.with(|local_set| { + local_set.block_on(&rt, async move { + while let Some(m) = rx.next().await { + m(); + } + }); + }); + })?; + } + + Ok(Self { task_count, tx }) + } + + pub fn task_count(&self) -> usize { + self.task_count.load(Ordering::Acquire) + } + + pub fn spawn_pinned(&self, f: F) + where + F: 'static + Send + FnOnce() -> Fut, + Fut: 'static + Future, + { + let guard = LocalJobCountGuard::new(self.task_count.clone()); + + // We ignore the result upon a failure, this can never happen unless the runtime is + // exiting which all instances of Runtime will be dropped at that time and hence cannot + // spawn pinned tasks. + let _ = self.tx.unbounded_send(Box::new(move || { + spawn_local(async move { + let _guard = guard; + + f().await; + }); + })); + } +} + +pub struct LocalJobCountGuard(Arc); + +impl LocalJobCountGuard { + fn new(inner: Arc) -> Self { + inner.fetch_add(1, Ordering::AcqRel); + LocalJobCountGuard(inner) + } +} + +impl Drop for LocalJobCountGuard { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::AcqRel); + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, + task_count: Arc, +} + +impl LocalHandle { + pub fn try_current() -> Option { + // We cache the handle to prevent borrowing RefCell. + thread_local! { + static LOCAL_HANDLE: Option = TASK_COUNT + .with(|m| m.borrow().clone()) + .map(|task_count| LocalHandle { task_count, _marker: PhantomData }); + } + + LOCAL_HANDLE.with(|m| m.clone()) + } + + pub fn current() -> Self { + Self::try_current().expect("outside of Yew runtime.") + } + + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + let guard = LocalJobCountGuard::new(self.task_count.clone()); + + LOCAL_SET.with(move |local_set| { + local_set.spawn_local(async move { + let _guard = guard; + + f.await; + }) + }); + } +} diff --git a/packages/yew/src/platform/rt_tokio/mod.rs b/packages/yew/src/platform/rt_tokio/mod.rs index 09210bd917d..40a30b87eaa 100644 --- a/packages/yew/src/platform/rt_tokio/mod.rs +++ b/packages/yew/src/platform/rt_tokio/mod.rs @@ -1,103 +1,14 @@ use std::future::Future; use std::sync::Arc; -use std::{fmt, io, thread}; +use std::{fmt, io}; use once_cell::sync::Lazy; pub(crate) mod time; -static DEFAULT_WORKER_NAME: &str = "yew-runtime-worker"; - -// We use a local worker implementation that does not produce a JoinHandle for spawn_pinned. -// This avoids the cost to acquire a JoinHandle. -// -// We will not be able to produce a meaningful JoinHandle until WebAssembly targets support -// unwinding. -// -// See: https://github.com/tokio-rs/tokio/issues/4819 -mod local_worker { - use std::sync::atomic::{AtomicUsize, Ordering}; - - use futures::channel::mpsc::UnboundedSender; - use futures::stream::StreamExt; - use tokio::task::{spawn_local, LocalSet}; - - use super::*; - - type SpawnTask = Box; - - pub(crate) struct LocalWorker { - task_count: Arc, - tx: UnboundedSender, - } - - impl LocalWorker { - pub fn new() -> io::Result { - let (tx, mut rx) = futures::channel::mpsc::unbounded::(); - - let task_count: Arc = Arc::default(); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - thread::Builder::new() - .name(DEFAULT_WORKER_NAME.into()) - .spawn(move || { - let local_set = LocalSet::new(); - - local_set.block_on(&rt, async move { - while let Some(m) = rx.next().await { - m(); - } - }); - - drop(local_set); - })?; - - Ok(Self { task_count, tx }) - } - - pub fn task_count(&self) -> usize { - self.task_count.load(Ordering::Acquire) - } - - pub fn spawn_pinned(&self, f: F) - where - F: 'static + Send + FnOnce() -> Fut, - Fut: 'static + Future, - { - let guard = LocalJobCountGuard::new(self.task_count.clone()); - - // We ignore the result upon a failure, this can never happen unless the runtime is - // exiting which all instances of Runtime will be dropped at that time and hence cannot - // spawn pinned tasks. - let _ = self.tx.unbounded_send(Box::new(move || { - spawn_local(async move { - let _guard = guard; - - f().await; - }); - })); - } - } - - pub struct LocalJobCountGuard(Arc); - - impl LocalJobCountGuard { - fn new(inner: Arc) -> Self { - inner.fetch_add(1, Ordering::AcqRel); - LocalJobCountGuard(inner) - } - } - - impl Drop for LocalJobCountGuard { - fn drop(&mut self) { - self.0.fetch_sub(1, Ordering::AcqRel); - } - } -} +mod local_worker; +pub(crate) use local_worker::LocalHandle; use local_worker::LocalWorker; pub(crate) fn get_default_runtime_size() -> usize { @@ -111,7 +22,15 @@ pub(super) fn spawn_local(f: F) where F: Future + 'static, { - tokio::task::spawn_local(f); + match LocalHandle::try_current() { + Some(m) => { + // If within a Yew runtime, use a local handle increases the local task count. + m.spawn_local(f); + } + None => { + tokio::task::spawn_local(f); + } + } } #[derive(Clone)] diff --git a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs b/packages/yew/src/platform/rt_wasm_bindgen/mod.rs index 18a1d3f374c..fb594980099 100644 --- a/packages/yew/src/platform/rt_wasm_bindgen/mod.rs +++ b/packages/yew/src/platform/rt_wasm_bindgen/mod.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::io; +use std::marker::PhantomData; pub(crate) mod time; @@ -26,3 +27,30 @@ impl Runtime { spawn_local(create_task()) } } + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + pub fn try_current() -> Option { + Some(Self { + _marker: PhantomData, + }) + } + + pub fn current() -> Self { + Self { + _marker: PhantomData, + } + } + + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + spawn_local(f); + } +} diff --git a/packages/yew/src/server_renderer.rs b/packages/yew/src/server_renderer.rs index 4fd9d41de56..8007ff96a43 100644 --- a/packages/yew/src/server_renderer.rs +++ b/packages/yew/src/server_renderer.rs @@ -5,7 +5,7 @@ use tracing::Instrument; use crate::html::{BaseComponent, Scope}; use crate::platform::io::{self, DEFAULT_BUF_SIZE}; -use crate::platform::{spawn_local, Runtime}; +use crate::platform::{spawn_local, LocalHandle, Runtime}; /// A Yew Server-side Renderer that renders on the current thread. /// @@ -249,17 +249,26 @@ where rt, } = self; - let rt = rt.unwrap_or_default(); let (mut w, r) = io::buffer(capacity); - - rt.spawn_pinned(move || async move { + let create_task = move || async move { let props = create_props(); let scope = Scope::::new(None); scope .render_into_stream(&mut w, props.into(), hydratable) .await; - }); + }; + + match rt { + // If a runtime is specified, spawn to the specified runtime. + Some(m) => m.spawn_pinned(create_task), + None => match LocalHandle::try_current() { + // If within a Yew Runtime, spawn to the current runtime. + Some(m) => m.spawn_local(create_task()), + // Outside of Yew Runtime, spawn to the default runtime. + None => Runtime::default().spawn_pinned(create_task), + }, + } r }