Skip to content

Commit

Permalink
Revert channel-based BufWriter.
Browse files Browse the repository at this point in the history
  • Loading branch information
futursolo committed Aug 13, 2022
1 parent ec96101 commit 83331d1
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 52 deletions.
8 changes: 4 additions & 4 deletions packages/yew/Cargo.toml
Expand Up @@ -31,6 +31,8 @@ implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.19", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
tracing = "0.1.36"

[dependencies.web-sys]
Expand Down Expand Up @@ -75,10 +77,8 @@ wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
once_cell = "1"
tokio = { version = "1.19", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }
once_cell = "1"

[dev-dependencies]
wasm-bindgen-test = "0.3"
Expand All @@ -95,7 +95,7 @@ features = [
]

[features]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"]
tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
Expand Down
4 changes: 2 additions & 2 deletions packages/yew/src/html/component/scope.rs
Expand Up @@ -264,15 +264,15 @@ mod feat_ssr {
use crate::html::component::lifecycle::{
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;

impl<COMP: BaseComponent> Scope<COMP> {
pub(crate) async fn render_into_stream(
&self,
w: &mut dyn BufWrite,
w: &mut BufWriter,
props: Rc<COMP::Properties>,
hydratable: bool,
) {
Expand Down
103 changes: 103 additions & 0 deletions packages/yew/src/platform/io.rs
@@ -0,0 +1,103 @@
//! This module contains types for I/O functionality.

// This module should remain private until impl trait type alias becomes available so
// `BufReader` can be produced with an existential type.

use std::borrow::Cow;

use futures::stream::Stream;

use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender};

// Same as std::io::BufWriter and futures::io::BufWriter.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter {
buf: String,
tx: UnboundedSender<String>,
capacity: usize,
}

/// Creates a Buffer pair.
pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>) {
let (tx, rx) = mpsc::unbounded_channel::<String>();

let tx = BufWriter {
buf: String::with_capacity(capacity),
tx,
capacity,
};

(tx, UnboundedReceiverStream::new(rx))
}

// Implementation Notes:
//
// When jemalloc is used and a reasonable buffer length is chosen,
// performance of this buffer is related to the number of allocations
// instead of the amount of memory that is allocated.
//
// A Bytes-based implementation is also tested, and yielded a similar performance to String-based
// buffer.
//
// Having a String-based buffer avoids unsafe / cost of conversion between String and Bytes
// when text based content is needed (e.g.: post-processing).
//
// `Bytes::from` can be used to convert a `String` to `Bytes` if web server asks for an
// `impl Stream<Item = Bytes>`. This conversion incurs no memory allocation.
//
// Yielding the output with a Stream provides a couple advantages:
//
// 1. All child components of a VList can have their own buffer and be rendered concurrently.
// 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled.
// Using a stream avoids this side effect and allows the renderer to finish rendering
// without being actively polled.
impl BufWriter {
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}

fn drain(&mut self) {
let _ = self.tx.send(self.buf.drain(..).collect());
self.buf.reserve(self.capacity);
}

/// Returns `True` if the internal buffer has capacity to fit a string of certain length.
#[inline]
fn has_capacity_of(&self, next_part_len: usize) -> bool {
self.buf.capacity() >= self.buf.len() + next_part_len
}

/// Writes a string into the buffer, optionally drains the buffer.
pub fn write(&mut self, s: Cow<'_, str>) {
if !self.has_capacity_of(s.len()) {
// There isn't enough capacity, we drain the buffer.
self.drain();
}

if self.has_capacity_of(s.len()) {
// The next part is going to fit into the buffer, we push it onto the buffer.
self.buf.push_str(&s);
} else {
// if the next part is more than buffer size, we send the next part.

// We don't need to drain the buffer here as the result of self.has_capacity_of() only
// changes if the buffer was drained. If the buffer capacity didn't change,
// then it means self.has_capacity_of() has returned true the first time which will be
// guaranteed to be matched by the left hand side of this implementation.
let _ = self.tx.send(s.into_owned());
}
}
}

impl Drop for BufWriter {
fn drop(&mut self) {
if !self.buf.is_empty() {
let mut buf = String::new();
std::mem::swap(&mut buf, &mut self.buf);
let _ = self.tx.send(buf);
}
}
}
3 changes: 2 additions & 1 deletion packages/yew/src/platform/mod.rs
Expand Up @@ -43,9 +43,10 @@
use std::future::Future;

#[cfg(feature = "ssr")]
pub(crate) mod fmt;
pub(crate) mod io;

pub mod pinned;
pub mod sync;
pub mod time;

#[cfg(target_arch = "wasm32")]
Expand Down
5 changes: 5 additions & 0 deletions packages/yew/src/platform/sync/mod.rs
@@ -0,0 +1,5 @@
//! A module that provides task synchronisation primitives.

#[doc(inline)]
pub use tokio::sync::oneshot;
pub mod mpsc;
6 changes: 6 additions & 0 deletions packages/yew/src/platform/sync/mpsc.rs
@@ -0,0 +1,6 @@
//! A multi-producer, single-receiver channel.

#[doc(inline)]
pub use tokio::sync::mpsc::*;
#[doc(inline)]
pub use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
35 changes: 13 additions & 22 deletions packages/yew/src/server_renderer.rs
Expand Up @@ -4,8 +4,7 @@ use futures::stream::{Stream, StreamExt};
use tracing::Instrument;

use crate::html::{BaseComponent, Scope};
use crate::platform::fmt::{BufWriter, DEFAULT_BUF_SIZE};
use crate::platform::pinned::mpsc;
use crate::platform::io::{self, DEFAULT_BUF_SIZE};
use crate::platform::{run_pinned, spawn_local};

/// A Yew Server-side Renderer that renders on the current thread.
Expand Down Expand Up @@ -101,8 +100,7 @@ where
fields(hydratable = self.hydratable, capacity = self.capacity),
)]
pub fn render_stream(self) -> impl Stream<Item = String> {
let (tx, rx) = mpsc::unbounded();
let mut w = BufWriter::new(tx, self.capacity);
let (mut w, r) = io::buffer(self.capacity);

let scope = Scope::<COMP>::new(None);
let outer_span = tracing::Span::current();
Expand All @@ -115,7 +113,7 @@ where
.await;
});

rx
r
}
}

Expand Down Expand Up @@ -231,27 +229,20 @@ where
///
/// Unlike [`LocalServerRenderer::render_stream`], this method is `async fn`.
pub async fn render_stream(self) -> impl Stream<Item = String> {
let Self {
create_props,
hydratable,
capacity,
} = self;

// We use run_pinned to switch to our runtime.
run_pinned(move || async move {
let (tx, rx) = futures::channel::mpsc::unbounded();
let Self {
create_props,
hydratable,
capacity,
} = self;

let props = create_props();
let scope = Scope::<COMP>::new(None);

let mut w = BufWriter::new(tx, capacity);

spawn_local(async move {
scope
.render_into_stream(&mut w, props.into(), hydratable)
.await;
});

rx
LocalServerRenderer::<COMP>::with_props(props)
.hydratable(hydratable)
.capacity(capacity)
.render_stream()
})
.await
}
Expand Down
6 changes: 3 additions & 3 deletions packages/yew/src/virtual_dom/mod.rs
Expand Up @@ -112,10 +112,10 @@ pub(crate) use feat_ssr_hydration::*;
#[cfg(feature = "ssr")]
mod feat_ssr {
use super::*;
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;

impl Collectable {
pub(crate) fn write_open_tag(&self, w: &mut dyn BufWrite) {
pub(crate) fn write_open_tag(&self, w: &mut BufWriter) {
w.write("<!--".into());
w.write(self.open_start_mark().into());

Expand All @@ -129,7 +129,7 @@ mod feat_ssr {
w.write("-->".into());
}

pub(crate) fn write_close_tag(&self, w: &mut dyn BufWrite) {
pub(crate) fn write_close_tag(&self, w: &mut BufWriter) {
w.write("<!--".into());
w.write(self.close_start_mark().into());

Expand Down
8 changes: 4 additions & 4 deletions packages/yew/src/virtual_dom/vcomp.rs
Expand Up @@ -20,7 +20,7 @@ use crate::html::{AnyScope, Scope};
#[cfg(feature = "csr")]
use crate::html::{NodeRef, Scoped};
#[cfg(feature = "ssr")]
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;

/// A virtual component.
pub struct VComp {
Expand Down Expand Up @@ -71,7 +71,7 @@ pub(crate) trait Mountable {
#[cfg(feature = "ssr")]
fn render_into_stream<'a>(
&'a self,
w: &'a mut dyn BufWrite,
w: &'a mut BufWriter,
parent_scope: &'a AnyScope,
hydratable: bool,
) -> LocalBoxFuture<'a, ()>;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<COMP: BaseComponent> Mountable for PropsWrapper<COMP> {
#[cfg(feature = "ssr")]
fn render_into_stream<'a>(
&'a self,
w: &'a mut dyn BufWrite,
w: &'a mut BufWriter,
parent_scope: &'a AnyScope,
hydratable: bool,
) -> LocalBoxFuture<'a, ()> {
Expand Down Expand Up @@ -243,7 +243,7 @@ mod feat_ssr {
#[inline]
pub(crate) async fn render_into_stream(
&self,
w: &mut dyn BufWrite,
w: &mut BufWriter,
parent_scope: &AnyScope,
hydratable: bool,
) {
Expand Down
10 changes: 4 additions & 6 deletions packages/yew/src/virtual_dom/vlist.rs
Expand Up @@ -161,13 +161,12 @@ mod feat_ssr {

use super::*;
use crate::html::AnyScope;
use crate::platform::fmt::{BufWrite, BufWriter};
use crate::platform::pinned::mpsc;
use crate::platform::io::{self, BufWriter};

impl VList {
pub(crate) async fn render_into_stream(
&self,
w: &mut dyn BufWrite,
w: &mut BufWriter,
parent_scope: &AnyScope,
hydratable: bool,
) {
Expand All @@ -182,10 +181,9 @@ mod feat_ssr {

// Concurrently render rest children into a separate buffer.
let rest_child_furs = rest_children.iter().map(|child| {
let (tx, rx) = mpsc::unbounded();
let mut w = BufWriter::new(tx, buf_capacity);
let (mut w, r) = io::buffer(buf_capacity);

child_streams.push(rx);
child_streams.push(r);

async move {
child
Expand Down
6 changes: 3 additions & 3 deletions packages/yew/src/virtual_dom/vnode.rs
Expand Up @@ -153,18 +153,18 @@ mod feat_ssr {

use super::*;
use crate::html::AnyScope;
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;

impl VNode {
pub(crate) fn render_into_stream<'a>(
&'a self,
w: &'a mut dyn BufWrite,
w: &'a mut BufWriter,
parent_scope: &'a AnyScope,
hydratable: bool,
) -> LocalBoxFuture<'a, ()> {
async fn render_into_stream_(
this: &VNode,
w: &mut dyn BufWrite,
w: &mut BufWriter,
parent_scope: &AnyScope,
hydratable: bool,
) {
Expand Down
4 changes: 2 additions & 2 deletions packages/yew/src/virtual_dom/vsuspense.rs
Expand Up @@ -28,13 +28,13 @@ impl VSuspense {
mod feat_ssr {
use super::*;
use crate::html::AnyScope;
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;
use crate::virtual_dom::Collectable;

impl VSuspense {
pub(crate) async fn render_into_stream(
&self,
w: &mut dyn BufWrite,
w: &mut BufWriter,
parent_scope: &AnyScope,
hydratable: bool,
) {
Expand Down
6 changes: 3 additions & 3 deletions packages/yew/src/virtual_dom/vtag.rs
Expand Up @@ -430,7 +430,7 @@ impl PartialEq for VTag {
mod feat_ssr {
use super::*;
use crate::html::AnyScope;
use crate::platform::fmt::BufWrite;
use crate::platform::io::BufWriter;
use crate::virtual_dom::VText;

// Elements that cannot have any child elements.
Expand All @@ -442,14 +442,14 @@ mod feat_ssr {
impl VTag {
pub(crate) async fn render_into_stream(
&self,
w: &mut dyn BufWrite,
w: &mut BufWriter,
parent_scope: &AnyScope,
hydratable: bool,
) {
w.write("<".into());
w.write(self.tag().into());

let write_attr = |w: &mut dyn BufWrite, name: &str, val: Option<&str>| {
let write_attr = |w: &mut BufWriter, name: &str, val: Option<&str>| {
w.write(" ".into());
w.write(name.into());

Expand Down

0 comments on commit 83331d1

Please sign in to comment.