Skip to content

Commit

Permalink
Adjust API.
Browse files Browse the repository at this point in the history
  • Loading branch information
futursolo committed Aug 13, 2022
1 parent 3ac02fd commit 345b745
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 67 deletions.
113 changes: 56 additions & 57 deletions packages/yew/src/platform/fmt.rs
@@ -1,4 +1,4 @@
//! This module contains types for I/O functionality.
//! Asynchronous utilities to work with `String`s.

use std::cell::RefCell;
use std::fmt::{self, Write};
Expand All @@ -25,22 +25,26 @@ struct BufStreamInner {
}

impl BufStreamInner {
#[inline]
const fn new() -> Self {
Self {
buf: String::new(),
state: BufStreamState::Ready,
}
}

#[inline]
fn wake(&self) {
if let BufStreamState::Pending(ref waker) = self.state {
waker.wake_by_ref();
}
}

fn finish(&mut self) {
self.wake();
self.state = BufStreamState::Done;
#[inline]
fn try_reserve(&mut self, capacity: usize) {
if self.buf.is_empty() {
self.buf.reserve(capacity);
}
}
}

Expand All @@ -50,7 +54,8 @@ pub(crate) struct BufWriter {
}

impl BufWriter {
pub fn capacity(&self) -> usize {
#[inline]
pub const fn capacity(&self) -> usize {
self.capacity
}
}
Expand All @@ -62,67 +67,60 @@ impl Write for BufWriter {
}

let mut inner = self.inner.borrow_mut();
inner.wake();

if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}
inner.wake();
inner.try_reserve(self.capacity);

inner.buf.write_str(s)
}

fn write_char(&mut self, c: char) -> fmt::Result {
let mut inner = self.inner.borrow_mut();
inner.wake();

if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}
inner.wake();
inner.try_reserve(self.capacity);

inner.buf.write_char(c)
}

fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
let mut inner = self.inner.borrow_mut();

if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}
inner.wake();
inner.try_reserve(self.capacity);

inner.buf.write_fmt(args)
}
}

pub(crate) struct BufStream {
inner: Rc<RefCell<BufStreamInner>>,
}
impl Drop for BufWriter {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();

impl BufStream {
pub fn new<C, F>(capacity: usize, f: C) -> (BufStream, impl Future<Output = ()>)
where
C: FnOnce(BufWriter) -> F,
F: Future<Output = ()>,
{
let inner = Rc::new(RefCell::new(BufStreamInner::new()));

let resolver = {
let inner = inner.clone();
let w = {
let inner = inner.clone();
BufWriter { inner, capacity }
};

async move {
f(w).await;
inner.borrow_mut().finish();
}
};

(Self { inner }, resolver)
inner.wake();
inner.state = BufStreamState::Done;
}
}

impl Stream for BufStream {
/// Creates an asynchronous buffer that operates over String.
pub(crate) fn buffer(capacity: usize) -> (BufWriter, BufReader) {
let inner = Rc::new(RefCell::new(BufStreamInner::new()));

let w = {
let inner = inner.clone();
BufWriter { inner, capacity }
};

let r = BufReader { inner };

(w, r)
}

pub(crate) struct BufReader {
inner: Rc<RefCell<BufStreamInner>>,
}

impl Stream for BufReader {
type Item = String;

fn poll_next(
Expand All @@ -132,9 +130,7 @@ impl Stream for BufStream {
let mut inner = self.inner.borrow_mut();

if !inner.buf.is_empty() {
let mut buf = String::new();
std::mem::swap(&mut buf, &mut inner.buf);

let buf = std::mem::take(&mut inner.buf);
return Poll::Ready(Some(buf));
}

Expand All @@ -147,7 +143,7 @@ impl Stream for BufStream {
}
}

impl FusedStream for BufStream {
impl FusedStream for BufReader {
fn is_terminated(&self) -> bool {
let inner = self.inner.borrow();

Expand All @@ -158,35 +154,38 @@ impl FusedStream for BufStream {
}
}

/// A buffered asynchronous string Stream.
///
/// This combines a BufWriter - BufReader pair and a resolving future.
/// The resoloving future will be polled as the stream is polled.
#[pin_project]
pub(crate) struct ResolvedBufStream<F>
pub(crate) struct BufStream<F>
where
F: Future<Output = ()>,
{
#[pin]
resolver: MaybeDone<F>,
inner: BufStream,
inner: BufReader,
}

impl<F> ResolvedBufStream<F>
impl<F> BufStream<F>
where
F: Future<Output = ()>,
{
pub fn new<C>(capacity: usize, f: C) -> ResolvedBufStream<impl Future<Output = ()>>
/// Creates a `BufStream`.
pub fn new<C>(capacity: usize, f: C) -> Self
where
C: FnOnce(BufWriter) -> F,
F: Future<Output = ()>,
{
let (inner, resolver) = BufStream::new(capacity, f);
let (w, r) = buffer(capacity);
let resolver = future::maybe_done(f(w));

ResolvedBufStream {
inner,
resolver: future::maybe_done(resolver),
}
BufStream { inner: r, resolver }
}
}

impl<F> Stream for ResolvedBufStream<F>
impl<F> Stream for BufStream<F>
where
F: Future<Output = ()>,
{
Expand All @@ -204,7 +203,7 @@ where
}
}

impl<F> FusedStream for ResolvedBufStream<F>
impl<F> FusedStream for BufStream<F>
where
F: Future<Output = ()>,
{
Expand Down
4 changes: 2 additions & 2 deletions packages/yew/src/server_renderer.rs
Expand Up @@ -5,7 +5,7 @@ use futures::stream::{Stream, StreamExt};
use tracing::Instrument;

use crate::html::{BaseComponent, Scope};
use crate::platform::fmt::{ResolvedBufStream, DEFAULT_BUF_SIZE};
use crate::platform::fmt::{BufStream, DEFAULT_BUF_SIZE};
use crate::platform::{run_pinned, spawn_local};

/// A Yew Server-side Renderer that renders on the current thread.
Expand Down Expand Up @@ -105,7 +105,7 @@ where
let scope = Scope::<COMP>::new(None);

let outer_span = tracing::Span::current();
ResolvedBufStream::new(self.capacity, move |mut w| async move {
BufStream::new(self.capacity, move |mut w| async move {
let render_span = tracing::debug_span!("render_stream_item");
render_span.follows_from(outer_span);
scope
Expand Down
16 changes: 8 additions & 8 deletions packages/yew/src/virtual_dom/vlist.rs
Expand Up @@ -163,7 +163,7 @@ mod feat_ssr {

use super::*;
use crate::html::AnyScope;
use crate::platform::fmt::{BufStream, BufWriter};
use crate::platform::fmt::{self, BufWriter};

impl VList {
pub(crate) async fn render_into_stream(
Expand All @@ -178,20 +178,20 @@ mod feat_ssr {
child.render_into_stream(w, parent_scope, hydratable).await;
}
[first_child, rest_children @ ..] => {
let capacity = w.capacity();
let buf_capacity = w.capacity();
let mut child_streams = Vec::with_capacity(self.children.len() - 1);

// Concurrently render rest children into a separate buffer.
let rest_child_furs = rest_children.iter().map(|child| {
let (s, resolver) = BufStream::new(capacity, move |mut w| async move {
let (mut w, r) = fmt::buffer(buf_capacity);

child_streams.push(r);

async move {
child
.render_into_stream(&mut w, parent_scope, hydratable)
.await;
});

child_streams.push(s);

resolver
}
});

// Concurrently resolve all child futures.
Expand Down

0 comments on commit 345b745

Please sign in to comment.