Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polled SSR Stream #2824

Merged
merged 32 commits into from Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5bf1ae8
Switch to pinned channels.
futursolo Aug 4, 2022
fb2603e
Fix ServerRenderer so it's not blocked until the result is resolved.
futursolo Aug 5, 2022
892b759
Fix tests.
futursolo Aug 5, 2022
29a46bd
Remove unused SendError.
futursolo Aug 5, 2022
20fc4a3
Merge branch 'master' into pinned-channels
futursolo Aug 7, 2022
8a41c3e
Implement a stream to be polled alongside rendering.
futursolo Aug 11, 2022
8688045
Update Buffer Size.
futursolo Aug 11, 2022
3bd9e0a
Make Send renderer work.
futursolo Aug 11, 2022
ed36760
Remove pinned channels.
futursolo Aug 11, 2022
2799a22
Unified Naming.
futursolo Aug 11, 2022
02a838d
Optimise code.
futursolo Aug 12, 2022
3de0770
Restore capacity.
futursolo Aug 12, 2022
689980e
merge 'master' into polled-stream
futursolo Aug 12, 2022
cbbd765
Remove unused profile.
futursolo Aug 12, 2022
2045c49
Merge branch 'master' into polled-stream
futursolo Aug 12, 2022
a470862
Default to separate resolver.
futursolo Aug 12, 2022
3ac02fd
Reduce allocations on string.
futursolo Aug 12, 2022
345b745
Adjust API.
futursolo Aug 13, 2022
d1a6ab3
Remove duplicate trait bound.
futursolo Aug 13, 2022
f0863a4
Update docs.
futursolo Aug 13, 2022
7dcaf6e
Remove capacity setting.
futursolo Aug 13, 2022
ced12ac
Merge branch 'master' into polled-stream
futursolo Aug 14, 2022
475f52e
Merge branch 'master' into polled-stream
futursolo Aug 16, 2022
e757d35
Unsafe?
futursolo Aug 16, 2022
0a399ba
Separate files.
futursolo Aug 16, 2022
6ae5a65
Adjust inlining.
futursolo Aug 16, 2022
f310575
Merge branch 'master' into polled-stream
futursolo Aug 29, 2022
e18e0fd
Fix test.
futursolo Aug 29, 2022
67348d3
Update notice.
futursolo Aug 29, 2022
f491d6c
Update documentation.
futursolo Sep 4, 2022
2a300fe
Merge branch 'master' into polled-stream
futursolo Sep 4, 2022
a78f0b3
Fix tests.
futursolo Sep 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions packages/yew/Cargo.toml
Expand Up @@ -31,9 +31,8 @@ implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.19", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
tracing = "0.1.36"
pin-project = "1.0.11"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pin-project-lite maybe?

Copy link
Member Author

@futursolo futursolo Sep 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, if you already have proc-macro related dependencies in your crate’s dependency graph, there is no benefit from using this crate.

See: https://docs.rs/pin-project-lite/0.2.8/pin_project_lite/#different-no-proc-macro-related-dependencies

The primary benefit of #[pin_project] is that rustfmt will continue to work for the struct.


[dependencies.web-sys]
version = "^0.3.59"
Expand Down Expand Up @@ -77,8 +76,10 @@ wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }
once_cell = "1"
tokio = { version = "1.19", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }

[dev-dependencies]
wasm-bindgen-test = "0.3"
Expand All @@ -95,7 +96,7 @@ features = [
]

[features]
tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
Expand Down
10 changes: 6 additions & 4 deletions packages/yew/src/html/component/scope.rs
Expand Up @@ -291,11 +291,13 @@ impl<COMP: BaseComponent> Scope<COMP> {

#[cfg(feature = "ssr")]
mod feat_ssr {
use std::fmt::Write;

use super::*;
use crate::html::component::lifecycle::{
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::io::BufWriter;
use crate::platform::fmt::BufWriter;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;
Expand Down Expand Up @@ -342,9 +344,9 @@ mod feat_ssr {
.await;

if let Some(prepared_state) = self.get_component().unwrap().prepare_state() {
w.write(r#"<script type="application/x-yew-comp-state">"#.into());
w.write(prepared_state.into());
w.write(r#"</script>"#.into());
let _ = w.write_str(r#"<script type="application/x-yew-comp-state">"#);
let _ = w.write_str(&prepared_state);
let _ = w.write_str(r#"</script>"#);
}

if hydratable {
Expand Down
212 changes: 212 additions & 0 deletions packages/yew/src/platform/fmt/buffer.rs
@@ -0,0 +1,212 @@
use std::cell::UnsafeCell;
use std::fmt::{self, Write};
use std::marker::PhantomData;
use std::rc::Rc;
use std::task::{Poll, Waker};

use futures::stream::{FusedStream, Stream};

static BUF_SIZE: usize = 1024;

enum BufStreamState {
Ready,
Pending(Waker),
Done,
}

struct Inner {
buf: String,
state: BufStreamState,

// This type is not send or sync.
_marker: PhantomData<Rc<()>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do:

impl !Send for Inner {}
impl !Sync for Inner {}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to that negative impls is not stable.

See: https://doc.rust-lang.org/unstable-book/language-features/negative-impls.html

}

impl Inner {
#[inline]
const fn new() -> Self {
Self {
buf: String::new(),
state: BufStreamState::Ready,
_marker: PhantomData,
}
}

#[inline]
fn wake(&mut self) {
if let BufStreamState::Pending(ref waker) = self.state {
waker.wake_by_ref();
self.state = BufStreamState::Ready;
}
}

#[inline]
fn buf_reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(BUF_SIZE);
}
}
}

impl Write for Inner {
fn write_str(&mut self, s: &str) -> fmt::Result {
if s.is_empty() {
return Ok(());
}

self.wake();
if s.len() < BUF_SIZE {
self.buf_reserve();
}

self.buf.write_str(s)
}

fn write_char(&mut self, c: char) -> fmt::Result {
self.wake();
self.buf_reserve();

self.buf.write_char(c)
}

fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
self.wake();
self.buf_reserve();

self.buf.write_fmt(args)
}
}

/// An asynchronous String BufWriter.
///
/// This type implements [`fmt::Write`] and can be.
futursolo marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) struct BufWriter {
inner: Rc<UnsafeCell<Inner>>,
}

impl Write for BufWriter {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_str(s)
}

#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_char(c)
}

#[inline]
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.write_fmt(args)
}
}

impl Drop for BufWriter {
fn drop(&mut self) {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

inner.wake();
inner.state = BufStreamState::Done;
}
}

/// An asynchronous String Stream.
futursolo marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) struct BufReader {
inner: Rc<UnsafeCell<Inner>>,
}

impl Stream for BufReader {
type Item = String;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &mut *self.inner.get() };

if !inner.buf.is_empty() {
let buf = std::mem::take(&mut inner.buf);
return Poll::Ready(Some(buf));
}

if let BufStreamState::Done = inner.state {
return Poll::Ready(None);
}

inner.state = BufStreamState::Pending(cx.waker().clone());
Poll::Pending
}
}

impl FusedStream for BufReader {
fn is_terminated(&self) -> bool {
// SAFETY:
//
// We can acquire a mutable reference without checking as:
//
// - This type is !Sync and !Send.
// - This function is not used by any other functions that has access to the inner type.
// - The mutable reference is dropped at the end of this function.
let inner = unsafe { &*self.inner.get() };

matches!(
(&inner.state, inner.buf.is_empty()),
(BufStreamState::Done, true)
)
}
}

/// Creates an asynchronous buffer that operates over String.
pub(crate) fn buffer() -> (BufWriter, BufReader) {
let inner = Rc::new(UnsafeCell::new(Inner::new()));

let w = {
let inner = inner.clone();
BufWriter { inner }
};

let r = BufReader { inner };

(w, r)
}
70 changes: 70 additions & 0 deletions packages/yew/src/platform/fmt/mod.rs
@@ -0,0 +1,70 @@
//! Asynchronous utilities to work with `String`s.

use std::future::Future;

use futures::future::{self, MaybeDone};
use futures::stream::{FusedStream, Stream};
use futures::StreamExt;
use pin_project::pin_project;

mod buffer;

pub(crate) use buffer::{buffer, BufReader, BufWriter};

/// A buffered asynchronous string Stream.
///
/// A BufStream combines a BufWriter - BufReader pair and a resolving future that writes to the
/// buffer and polls the future alongside the buffer.
#[pin_project]
pub(crate) struct BufStream<F>
where
F: Future<Output = ()>,
Comment on lines +20 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps remove the bound from here? That would mean that everywhere this type is named doesn't require the bound - only the usages do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot remove this as this bound is declared on the MaybeDone type itself.

{
#[pin]
resolver: MaybeDone<F>,
inner: BufReader,
}

impl<F> BufStream<F>
where
F: Future<Output = ()>,
{
/// Creates a `BufStream`.
pub fn new<C>(f: C) -> Self
where
C: FnOnce(BufWriter) -> F,
{
let (w, r) = buffer();
let resolver = future::maybe_done(f(w));

BufStream { inner: r, resolver }
}
}

impl<F> Stream for BufStream<F>
where
F: Future<Output = ()>,
{
type Item = String;

#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let _ = this.resolver.poll(cx);

this.inner.poll_next_unpin(cx)
}
}

impl<F> FusedStream for BufStream<F>
where
F: Future<Output = ()>,
{
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}