Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinned Channels #2811

Merged
merged 20 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/yew/Cargo.toml
Expand Up @@ -31,8 +31,6 @@ implicit-clone = { version = "0.3", features = ["map"] }
base64ct = { version = "1.5.0", features = ["std"], optional = true }
bincode = { version = "1.3.3", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.19", features = ["sync"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }

[dependencies.web-sys]
version = "0.3"
Expand Down Expand Up @@ -75,8 +73,10 @@ wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
num_cpus = { version = "1.13", optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }
once_cell = "1"
tokio = { version = "1.19", features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", features = ["time"], optional = true }
tokio-util = { version = "0.7", features = ["rt"], optional = true }

[dev-dependencies]
wasm-bindgen-test = "0.3"
Expand All @@ -93,7 +93,7 @@ features = [
]

[features]
tokio = ["tokio/rt", "tokio/time", "dep:num_cpus", "dep:tokio-util"]
tokio = ["dep:tokio", "dep:num_cpus", "dep:tokio-util", "dep:tokio-stream"]
ssr = ["dep:html-escape", "dep:base64ct", "dep:bincode"]
csr = []
hydration = ["csr", "dep:bincode"]
Expand Down
2 changes: 1 addition & 1 deletion packages/yew/src/html/component/lifecycle.rs
Expand Up @@ -41,7 +41,7 @@ pub(crate) enum ComponentRenderState {

#[cfg(feature = "ssr")]
Ssr {
sender: Option<crate::platform::sync::oneshot::Sender<Html>>,
sender: Option<crate::platform::pinned::oneshot::Sender<Html>>,
},
}

Expand Down
6 changes: 3 additions & 3 deletions packages/yew/src/html/component/scope.rs
Expand Up @@ -264,15 +264,15 @@ mod feat_ssr {
use crate::html::component::lifecycle::{
ComponentRenderState, CreateRunner, DestroyRunner, RenderRunner,
};
use crate::platform::io::BufWriter;
use crate::platform::sync::oneshot;
use crate::platform::fmt::BufWrite;
use crate::platform::pinned::oneshot;
use crate::scheduler;
use crate::virtual_dom::Collectable;

impl<COMP: BaseComponent> Scope<COMP> {
pub(crate) async fn render_into_stream(
&self,
w: &mut BufWriter,
w: &mut dyn BufWrite,
props: Rc<COMP::Properties>,
hydratable: bool,
) {
Expand Down
Expand Up @@ -5,31 +5,40 @@

use std::borrow::Cow;

use futures::stream::Stream;

use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender};
use crate::platform::pinned;

// Same as std::io::BufWriter and futures::io::BufWriter.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter {
buf: String,
tx: UnboundedSender<String>,
capacity: usize,
pub(crate) trait BufSend {
fn buf_send(&self, item: String);
}

/// Creates a Buffer pair.
pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>) {
let (tx, rx) = mpsc::unbounded_channel::<String>();
impl BufSend for pinned::mpsc::UnboundedSender<String> {
fn buf_send(&self, item: String) {
let _ = self.send_now(item);
}
}

let tx = BufWriter {
buf: String::with_capacity(capacity),
tx,
capacity,
};
impl BufSend for futures::channel::mpsc::UnboundedSender<String> {
fn buf_send(&self, item: String) {
let _ = self.unbounded_send(item);
}
}

(tx, UnboundedReceiverStream::new(rx))
pub trait BufWrite {
fn capacity(&self) -> usize;
fn write(&mut self, s: Cow<'_, str>);
}

/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter<S>
where
S: BufSend,
{
buf: String,
tx: S,
capacity: usize,
}

// Implementation Notes:
Expand All @@ -53,25 +62,53 @@ pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>)
// 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled.
// Using a stream avoids this side effect and allows the renderer to finish rendering
// without being actively polled.
impl BufWriter {
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
impl<S> BufWriter<S>
where
S: BufSend,
{
pub fn new(tx: S, capacity: usize) -> Self {
Self {
buf: String::new(),
tx,
capacity,
}
}

#[inline]
fn drain(&mut self) {
let _ = self.tx.send(self.buf.drain(..).collect());
self.buf.reserve(self.capacity);
if !self.buf.is_empty() {
self.tx.buf_send(self.buf.split_off(0));
}
}

#[inline]
fn reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(self.capacity);
}
}

/// Returns `True` if the internal buffer has capacity to fit a string of certain length.
#[inline]
fn has_capacity_of(&self, next_part_len: usize) -> bool {
self.buf.capacity() >= self.buf.len() + next_part_len
}
}

impl<S> BufWrite for BufWriter<S>
where
S: BufSend,
{
#[inline]
fn capacity(&self) -> usize {
self.capacity
}

/// Writes a string into the buffer, optionally drains the buffer.
pub fn write(&mut self, s: Cow<'_, str>) {
fn write(&mut self, s: Cow<'_, str>) {
// Try to reserve the capacity first.
self.reserve();

if !self.has_capacity_of(s.len()) {
// There isn't enough capacity, we drain the buffer.
self.drain();
Expand All @@ -87,17 +124,20 @@ impl BufWriter {
// changes if the buffer was drained. If the buffer capacity didn't change,
// then it means self.has_capacity_of() has returned true the first time which will be
// guaranteed to be matched by the left hand side of this implementation.
let _ = self.tx.send(s.into_owned());
self.tx.buf_send(s.into_owned());
}
}
}

impl Drop for BufWriter {
impl<S> Drop for BufWriter<S>
where
S: BufSend,
{
fn drop(&mut self) {
if !self.buf.is_empty() {
let mut buf = String::new();
std::mem::swap(&mut buf, &mut self.buf);
let _ = self.tx.send(buf);
self.tx.buf_send(buf);
}
}
}
4 changes: 2 additions & 2 deletions packages/yew/src/platform/mod.rs
Expand Up @@ -43,9 +43,9 @@
use std::future::Future;

#[cfg(feature = "ssr")]
pub(crate) mod io;
pub(crate) mod fmt;

pub mod sync;
pub mod pinned;
pub mod time;

#[cfg(target_arch = "wasm32")]
Expand Down
6 changes: 6 additions & 0 deletions packages/yew/src/platform/pinned/mod.rs
@@ -0,0 +1,6 @@
//! Task Synchronisation Primitives for pinned tasks.
//!
//! This module provides task synchronisation for `!Send` futures.

pub mod mpsc;
pub mod oneshot;