Skip to content

Commit

Permalink
Configurable Runtime (#2772)
Browse files Browse the repository at this point in the history
* Adds Runtime.

* A LocalRuntime.

* Add note.

* Add SSR benchmark.

* Only create default runtime if no custom runtime is set.

* Use jemalloc for benchmarking.

* Remove once_cell for web assembly.

* Add time.

* Fix wasm_bindgen.

* Adjust inlining.

* Optimise benchmark output.

* Optimise BufWriter.

* Add json output.

* Add Benchmark Workflow.

* Remove local set from tests.

* Fix Workflow syntax.

* Exclude benchmark from doc tests.

* Adjust feature flags.

* Adds a pinned channel implementation.

* Make Send bound explicit.

* Implement on immutable reference.

* Fix Sink close.

* run_pinned -> spawn_pinned.

* Add tests.

* Adjusts worker threads.

* Fix workflow.

* Remove futures-executor.

* Cargo update.

* Fix docs.

* Update notice.

* Fix docs.

* Fix docs.

* Switch to task spawning.

* Use futures unordered instead of spawn_local.

* Switch to join_all.

* Remove LocalPoolHandle.

* Fix docs.

* Spawn a single task.

* Fix merge failure.

* Remove LocalRuntime.

* Update documentation.

* Merge local-runtime-handle into local-runtime.

* Add some tests.

* Fix clippy notice.

* Fix comment.

* Address various review comments.

* Remove unused type.

* Fix clippy.

* Fix clippy.
  • Loading branch information
futursolo committed Aug 28, 2022
1 parent 72213ee commit cffb7c5
Show file tree
Hide file tree
Showing 11 changed files with 696 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/post-size-cmp.yml
Expand Up @@ -8,7 +8,7 @@ on:
- completed

jobs:
size-cmp:
post-size-cmp:
name: Post Comment on Pull Request
runs-on: ubuntu-latest

Expand Down
29 changes: 29 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions examples/simple_ssr/src/bin/simple_ssr_server.rs
Expand Up @@ -25,7 +25,7 @@ async fn render(

Box::new(
stream::once(async move { index_html_before })
.chain(renderer.render_stream().await)
.chain(renderer.render_stream())
.chain(stream::once(async move { index_html_after }))
.map(|m| Result::<_, BoxedError>::Ok(m.into())),
)
Expand Down Expand Up @@ -54,6 +54,5 @@ async fn main() {
let routes = html.or(warp::fs::dir(opts.dir));

println!("You can view the website at: http://localhost:8080/");

warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}
6 changes: 4 additions & 2 deletions examples/ssr_router/Cargo.toml
Expand Up @@ -17,7 +17,7 @@ required-features = ["ssr"]
yew = { path = "../../packages/yew" }
function_router = { path = "../function_router" }
log = "0.4"
futures = "0.3"
futures = { version = "0.3", features = ["std"], default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
Expand All @@ -30,7 +30,9 @@ tower = { version = "0.4", features = ["make"] }
tower-http = { version = "0.3", features = ["fs"] }
env_logger = "0.9"
clap = { version = "3.1.7", features = ["derive"] }
hyper = { version = "0.14", features = ["server", "http1"] }
jemallocator = "0.5"

[features]
ssr = ["yew/ssr"]
ssr = ["yew/ssr", "yew/tokio"]
hydration = ["yew/hydration"]
36 changes: 34 additions & 2 deletions examples/ssr_router/src/bin/ssr_router_server.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::convert::Infallible;
use std::future::Future;
use std::path::PathBuf;

use axum::body::{Body, StreamBody};
Expand All @@ -13,8 +14,14 @@ use axum::{Extension, Router};
use clap::Parser;
use function_router::{ServerApp, ServerAppProps};
use futures::stream::{self, StreamExt};
use hyper::server::Server;
use tower::ServiceExt;
use tower_http::services::ServeDir;
use yew::platform::Runtime;

// We use jemalloc as it produces better performance.
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

/// A basic example
#[derive(Parser, Debug)]
Expand All @@ -38,14 +45,38 @@ async fn render(

StreamBody::new(
stream::once(async move { index_html_before })
.chain(renderer.render_stream().await)
.chain(renderer.render_stream())
.chain(stream::once(async move { index_html_after }))
.map(Result::<_, Infallible>::Ok),
)
}

// An executor to process requests on the Yew runtime.
//
// By spawning requests on the Yew runtime,
// it processes request on the same thread as the rendering task.
//
// This increases performance in some environments (e.g.: in VM).
#[derive(Clone, Default)]
struct Executor {
inner: Runtime,
}

impl<F> hyper::rt::Executor<F> for Executor
where
F: Future + Send + 'static,
{
fn execute(&self, fut: F) {
self.inner.spawn_pinned(move || async move {
fut.await;
});
}
}

#[tokio::main]
async fn main() {
let exec = Executor::default();

env_logger::init();

let opts = Opt::parse();
Expand Down Expand Up @@ -86,7 +117,8 @@ async fn main() {

println!("You can view the website at: http://localhost:8080/");

axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
Server::bind(&"127.0.0.1:8080".parse().unwrap())
.executor(exec)
.serve(app.into_make_service())
.await
.unwrap();
Expand Down
131 changes: 115 additions & 16 deletions packages/yew/src/platform/mod.rs
Expand Up @@ -41,6 +41,8 @@
//! `tokio`'s timer, IO and task synchronisation primitives.

use std::future::Future;
use std::io::Result;
use std::marker::PhantomData;

#[cfg(feature = "ssr")]
pub(crate) mod io;
Expand Down Expand Up @@ -72,21 +74,118 @@ where
imp::spawn_local(f);
}

/// Runs a task with it pinned onto a local worker thread.
///
/// This can be used to execute non-Send futures without blocking the current thread.
///
/// It maintains an internal thread pool dedicated to executing local futures.
/// A Runtime Builder.
#[derive(Debug)]
pub struct RuntimeBuilder {
worker_threads: usize,
}

impl Default for RuntimeBuilder {
fn default() -> Self {
Self {
worker_threads: imp::get_default_runtime_size(),
}
}
}

impl RuntimeBuilder {
/// Creates a new Runtime Builder.
pub fn new() -> Self {
Self::default()
}

/// Sets the number of worker threads the Runtime will use.
///
/// # Default
///
/// The default number of worker threads is the number of available logical CPU cores.
///
/// # Note
///
/// This setting has no effect if current platform has no thread support (e.g.: WebAssembly).
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
self.worker_threads = val;

self
}

/// Creates a Runtime.
pub fn build(&mut self) -> Result<Runtime> {
Ok(Runtime {
inner: imp::Runtime::new(self.worker_threads)?,
})
}
}

/// The Yew Runtime.
#[derive(Debug, Clone, Default)]
pub struct Runtime {
inner: imp::Runtime,
}

impl Runtime {
/// Creates a runtime Builder.
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}

/// Spawns a task with it pinned to a worker thread.
///
/// This can be used to execute non-Send futures without blocking the current thread.
///
/// [`spawn_local`] is available with tasks executed with `spawn_pinned`.
pub fn spawn_pinned<F, Fut>(&self, create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
self.inner.spawn_pinned(create_task);
}
}

/// A Local Runtime Handle.
///
/// [`spawn_local`] is available with tasks executed with `run_pinned`.
#[inline(always)]
#[cfg(feature = "ssr")]
pub(crate) async fn run_pinned<F, Fut>(create_task: F) -> Fut::Output
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
imp::run_pinned(create_task).await
/// This type can be used to acquire a runtime handle to spawn local tasks.
#[derive(Debug, Clone)]
pub struct LocalHandle {
inner: imp::LocalHandle,
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}

impl LocalHandle {
/// Creates a Handle to current Runtime worker.
///
/// # Panics
///
/// This method will panic if not called within Yew Runtime.
pub fn current() -> Self {
let inner = imp::LocalHandle::current();

Self {
inner,
_marker: PhantomData,
}
}

/// Creates a Handle to current Runtime worker.
///
/// This methods will return `None` if called from outside Yew Runtime.
pub fn try_current() -> Option<Self> {
let inner = imp::LocalHandle::try_current()?;

Some(Self {
inner,
_marker: PhantomData,
})
}

/// Spawns a Future with current Runtime worker.
pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
self.inner.spawn_local(f);
}
}
60 changes: 51 additions & 9 deletions packages/yew/src/platform/rt_none/mod.rs
@@ -1,7 +1,13 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;

pub(crate) mod time;

pub(crate) fn get_default_runtime_size() -> usize {
0
}

static NO_RUNTIME_NOTICE: &str = r#"No runtime configured for this platform, \
features that requires a runtime can't be used. \
Either compile with `target_arch = "wasm32", or enable the `tokio` feature."#;
Expand All @@ -18,13 +24,49 @@ where
panic_no_runtime();
}

#[cfg(feature = "ssr")]
pub(crate) async fn run_pinned<F, Fut>(_create_task: F) -> Fut::Output
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
panic_no_runtime();
#[derive(Debug, Clone)]
pub(crate) struct Runtime {}

impl Default for Runtime {
fn default() -> Self {
panic_no_runtime();
}
}

impl Runtime {
pub fn new(_size: usize) -> io::Result<Self> {
panic_no_runtime();
}

pub fn spawn_pinned<F, Fut>(&self, _create_task: F)
where
F: FnOnce() -> Fut,
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
panic_no_runtime();
}
}

#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}

impl LocalHandle {
pub fn try_current() -> Option<Self> {
panic_no_runtime();
}

pub fn current() -> Self {
panic_no_runtime();
}

pub fn spawn_local<F>(&self, _f: F)
where
F: Future<Output = ()> + 'static,
{
panic_no_runtime();
}
}

0 comments on commit cffb7c5

Please sign in to comment.