Skip to content

Commit

Permalink
Polled SSR Stream (#2824)
Browse files Browse the repository at this point in the history
* Switch to pinned channels.

* Fix ServerRenderer so it's not blocked until the result is resolved.

* Fix tests.

* Remove unused SendError.

* Implement a stream to be polled alongside rendering.

* Update Buffer Size.

* Make Send renderer work.

* Remove pinned channels.

* Unified Naming.

* Optimise code.

* Restore capacity.

* Remove unused profile.

* Default to separate resolver.

* Reduce allocations on string.

* Adjust API.

* Remove duplicate trait bound.

* Update docs.

* Remove capacity setting.

* Unsafe?

* Separate files.

* Adjust inlining.

* Fix test.

* Update notice.

* Update documentation.

* Fix tests.
  • Loading branch information
futursolo committed Sep 10, 2022
1 parent 9484249 commit 278d2ce
Show file tree
Hide file tree
Showing 20 changed files with 506 additions and 279 deletions.
9 changes: 5 additions & 4 deletions packages/yew/Cargo.toml
Expand Up @@ -31,9 +31,8 @@ implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.19", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
tracing = "0.1.36"
pin-project = "1.0.11"

[dependencies.web-sys]
version = "^0.3.59"
Expand Down Expand Up @@ -77,8 +76,10 @@ wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }
once_cell = "1"
tokio = { version = "1.19", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }

[dev-dependencies]
wasm-bindgen-test = "0.3"
Expand All @@ -95,7 +96,7 @@ features = [
]

[features]
tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
Expand Down
10 changes: 6 additions & 4 deletions packages/yew/src/html/component/scope.rs
Expand Up @@ -291,11 +291,13 @@ impl<COMP: BaseComponent> Scope<COMP> {

#[cfg(feature = "ssr")]
mod feat_ssr {
use std::fmt::Write;

use super::*;
use crate::html::component::lifecycle::{
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::io::BufWriter;
use crate::platform::fmt::BufWriter;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;
Expand Down Expand Up @@ -342,9 +344,9 @@ mod feat_ssr {
.await;

if let Some(prepared_state) = self.get_component().unwrap().prepare_state() {
w.write(r#"<script type="application/x-yew-comp-state">"#.into());
w.write(prepared_state.into());
w.write(r#"</script>"#.into());
let _ = w.write_str(r#"<script type="application/x-yew-comp-state">"#);
let _ = w.write_str(&prepared_state);
let _ = w.write_str(r#"</script>"#);
}

if hydratable {
Expand Down
212 changes: 212 additions & 0 deletions packages/yew/src/platform/fmt/buffer.rs
@@ -0,0 +1,212 @@
use std::cell::UnsafeCell;
use std::fmt::{self, Write};
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};

use futures::stream::{FusedStream, Stream};

static BUF_SIZE: usize = 1024;

enum BufStreamState {
Ready,
Pending(Waker),
Done,
}

struct Inner {
buf: String,
state: BufStreamState,

// This type is not send or sync.
_marker: PhantomData<Rc<()>>,
}

impl Inner {
#[inline]
const fn new() -> Self {
Self {
buf: String::new(),
state: BufStreamState::Ready,
_marker: PhantomData,
}
}

#[inline]
fn wake(&mut self) {
if let BufStreamState::Pending(ref waker) = self.state {
waker.wake_by_ref();
self.state = BufStreamState::Ready;
}
}

#[inline]
fn buf_reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(BUF_SIZE);
}
}
}

impl Write for Inner {
fn write_str(&mut self, s: &str) -> fmt::Result {
if s.is_empty() {
return Ok(());
}

self.wake();
if s.len() < BUF_SIZE {
self.buf_reserve();
}

self.buf.write_str(s)
}

fn write_char(&mut self, c: char) -> fmt::Result {
self.wake();
self.buf_reserve();

self.buf.write_char(c)
}

fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
self.wake();
self.buf_reserve();

self.buf.write_fmt(args)
}
}

/// An asynchronous [`String`] writer.
///
/// This type implements [`fmt::Write`] and can be used with [`write!`] and [`writeln!`].
pub(crate) struct BufWriter {
inner: Rc<UnsafeCell<Inner>>,
}

impl Write for BufWriter {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_str(s)
}

#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_char(c)
}

#[inline]
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_fmt(args)
}
}

impl Drop for BufWriter {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.wake();
inner.state = BufStreamState::Done;
}
}

/// An asynchronous [`String`] reader.
pub(crate) struct BufReader {
inner: Rc<UnsafeCell<Inner>>,
}

impl Stream for BufReader {
type Item = String;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

if !inner.buf.is_empty() {
let buf = std::mem::take(&mut inner.buf);
return Poll::Ready(Some(buf));
}

if let BufStreamState::Done = inner.state {
return Poll::Ready(None);
}

inner.state = BufStreamState::Pending(cx.waker().clone());
Poll::Pending
}
}

impl FusedStream for BufReader {
fn is_terminated(&self) -> bool {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &*self.inner.get() };

matches!(
(&inner.state, inner.buf.is_empty()),
(BufStreamState::Done, true)
)
}
}

/// Creates an asynchronous buffer that operates over String.
pub(crate) fn buffer() -> (BufWriter, BufReader) {
let inner = Rc::new(UnsafeCell::new(Inner::new()));

let w = {
let inner = inner.clone();
BufWriter { inner }
};

let r = BufReader { inner };

(w, r)
}
70 changes: 70 additions & 0 deletions packages/yew/src/platform/fmt/mod.rs
@@ -0,0 +1,70 @@
//! Asynchronous utilities to work with `String`s.

use std::future::Future;

use futures::future::{self, MaybeDone};
use futures::stream::{FusedStream, Stream};
use futures::StreamExt;
use pin_project::pin_project;

mod buffer;

pub(crate) use buffer::{buffer, BufReader, BufWriter};

/// A buffered asynchronous [`String`] [`Stream`].
///
/// A BufStream combines a BufWriter - BufReader pair and a resolving future that writes to the
/// buffer and polls the future alongside the buffer.
#[pin_project]
pub(crate) struct BufStream<F>
where
F: Future<Output = ()>,
{
#[pin]
resolver: MaybeDone<F>,
inner: BufReader,
}

impl<F> BufStream<F>
where
F: Future<Output = ()>,
{
/// Creates a `BufStream`.
pub fn new<C>(f: C) -> Self
where
C: FnOnce(BufWriter) -> F,
{
let (w, r) = buffer();
let resolver = future::maybe_done(f(w));

BufStream { inner: r, resolver }
}
}

impl<F> Stream for BufStream<F>
where
F: Future<Output = ()>,
{
type Item = String;

#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let _ = this.resolver.poll(cx);

this.inner.poll_next_unpin(cx)
}
}

impl<F> FusedStream for BufStream<F>
where
F: Future<Output = ()>,
{
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

0 comments on commit 278d2ce

Please sign in to comment.