Skip to content

Commit

Permalink
Default to separate resolver.
Browse files Browse the repository at this point in the history
  • Loading branch information
futursolo committed Aug 12, 2022
1 parent 2045c49 commit a470862
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 167 deletions.
257 changes: 98 additions & 159 deletions packages/yew/src/platform/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,98 +6,41 @@ use std::future::Future;
use std::rc::Rc;
use std::task::{Poll, Waker};

use futures::future::{self, FusedFuture, MaybeDone};
use futures::future::{self, MaybeDone};
use futures::stream::{FusedStream, Stream};
use futures::StreamExt;
use pin_project::pin_project;

pub(crate) static DEFAULT_BUF_SIZE: usize = 1024;

enum BufStreamInner {
Combined {
buf: String,
},
Detached {
buf: String,
waker: Option<Waker>,
done: bool,
},
enum BufStreamState {
Ready,
Pending(Waker),
Done,
}

struct BufStreamInner {
buf: String,
state: BufStreamState,
}

impl BufStreamInner {
#[inline]
const fn new_detached() -> Self {
Self::Detached {
const fn new() -> Self {
Self {
buf: String::new(),
waker: None,
done: false,
}
}

#[inline]
const fn new_combined() -> Self {
Self::Combined { buf: String::new() }
}

#[inline]
fn buf(&self) -> &String {
match self {
Self::Combined { ref buf } => buf,
Self::Detached { ref buf, .. } => buf,
}
}

#[inline]
fn buf_mut(&mut self) -> &mut String {
match self {
Self::Combined { ref mut buf } => buf,
Self::Detached { ref mut buf, .. } => buf,
state: BufStreamState::Ready,
}
}

fn wake(&self) {
match self {
Self::Combined { .. } => {}
Self::Detached { ref waker, .. } => {
if let Some(m) = waker {
m.wake_by_ref();
}
}
}
}

fn set_waker(&mut self, waker: Waker) {
match self {
Self::Combined { .. } => {}
Self::Detached {
waker: ref mut current_waker,
..
} => {
*current_waker = Some(waker);
}
if let BufStreamState::Pending(ref waker) = self.state {
waker.wake_by_ref();
}
}

fn finish(&mut self) {
match self {
Self::Combined { .. } => {}
Self::Detached {
ref waker,
ref mut done,
..
} => {
*done = true;
if let Some(m) = waker {
m.wake_by_ref();
}
}
}
}

fn is_finished(&self) -> Option<bool> {
match self {
Self::Combined { .. } => None,
Self::Detached { ref buf, done, .. } => Some(buf.is_empty() && *done),
}
self.wake();
self.state = BufStreamState::Done;
}
}

Expand All @@ -121,76 +64,46 @@ impl Write for BufWriter {
let mut inner = self.inner.borrow_mut();
inner.wake();

let buf = inner.buf_mut();
if buf.is_empty() {
buf.reserve(self.capacity);
if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}

buf.write_str(s)
inner.buf.write_str(s)
}

fn write_char(&mut self, c: char) -> fmt::Result {
let mut inner = self.inner.borrow_mut();
inner.wake();

let buf = inner.buf_mut();
if buf.is_empty() {
buf.reserve(self.capacity);
if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}

buf.write_char(c)
inner.buf.write_char(c)
}

fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
let mut inner = self.inner.borrow_mut();

let buf = inner.buf_mut();
if buf.is_empty() {
buf.reserve(self.capacity);
if inner.buf.is_empty() {
inner.buf.reserve(self.capacity);
}

buf.write_fmt(args)
inner.buf.write_fmt(args)
}
}

#[pin_project]
pub(crate) struct BufStream<F>
where
F: Future<Output = ()>,
{
#[pin]
resolver: Option<MaybeDone<F>>,
pub(crate) struct BufStream {
inner: Rc<RefCell<BufStreamInner>>,
}

impl<F> BufStream<F>
where
F: Future<Output = ()>,
{
pub fn new<C>(capacity: usize, f: C) -> Self
where
C: FnOnce(BufWriter) -> F,
{
let inner = Rc::new(RefCell::new(BufStreamInner::new_combined()));

let resolver = {
let inner = inner.clone();
let w = BufWriter { inner, capacity };

f(w)
};

Self {
resolver: Some(future::maybe_done(resolver)),
inner,
}
}

pub fn new_with_resolver<C>(capacity: usize, f: C) -> (BufStream<F>, impl Future<Output = ()>)
impl BufStream {
pub fn new<C, F>(capacity: usize, f: C) -> (BufStream, impl Future<Output = ()>)
where
C: FnOnce(BufWriter) -> F,
F: Future<Output = ()>,
{
let inner = Rc::new(RefCell::new(BufStreamInner::new_detached()));
let inner = Rc::new(RefCell::new(BufStreamInner::new()));

let resolver = {
let inner = inner.clone();
Expand All @@ -205,68 +118,94 @@ where
}
};

(
Self {
resolver: None,
inner,
},
resolver,
)
(Self { inner }, resolver)
}
}

impl<F> Stream for BufStream<F>
where
F: Future<Output = ()>,
{
impl Stream for BufStream {
type Item = String;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let mut inner = self.inner.borrow_mut();

if !inner.buf.is_empty() {
return Poll::Ready(Some(inner.buf.split_off(0)));
}

match this.resolver.as_pin_mut() {
Some(mut resolver) => {
let _ = resolver.as_mut().poll(cx);
if let BufStreamState::Done = inner.state {
return Poll::Ready(None);
}

let mut inner = this.inner.borrow_mut();
inner.state = BufStreamState::Pending(cx.waker().clone());
Poll::Pending
}
}

match (inner.buf().is_empty(), resolver.is_terminated()) {
(true, true) => Poll::Ready(None),
(true, false) => Poll::Pending,
(false, _) => Poll::Ready(Some(inner.buf_mut().split_off(0))),
}
}
None => {
let mut inner = this.inner.borrow_mut();
impl FusedStream for BufStream {
fn is_terminated(&self) -> bool {
let inner = self.inner.borrow();

if !inner.buf().is_empty() {
return Poll::Ready(Some(inner.buf_mut().split_off(0)));
}
matches!(
(&inner.state, inner.buf.is_empty()),
(BufStreamState::Done, true)
)
}
}

if Some(true) == inner.is_finished() {
return Poll::Ready(None);
}
#[pin_project]
pub(crate) struct ResolvedBufStream<F>
where
F: Future<Output = ()>,
{
#[pin]
resolver: MaybeDone<F>,
inner: BufStream,
}

inner.set_waker(cx.waker().clone());
Poll::Pending
}
impl<F> ResolvedBufStream<F>
where
F: Future<Output = ()>,
{
pub fn new<C>(capacity: usize, f: C) -> ResolvedBufStream<impl Future<Output = ()>>
where
C: FnOnce(BufWriter) -> F,
F: Future<Output = ()>,
{
let (inner, resolver) = BufStream::new(capacity, f);

ResolvedBufStream {
inner,
resolver: future::maybe_done(resolver),
}
}
}

impl<F> FusedStream for BufStream<F>
impl<F> Stream for ResolvedBufStream<F>
where
F: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
let inner = self.inner.borrow();
type Item = String;

match self.resolver.as_ref() {
Some(resolver) => inner.buf().is_empty() && resolver.is_terminated(),
None => inner.is_finished().unwrap_or_default(),
}
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let _ = this.resolver.poll(cx);

this.inner.poll_next_unpin(cx)
}
}

impl<F> FusedStream for ResolvedBufStream<F>
where
F: Future<Output = ()>,
{
#[inline]
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
4 changes: 2 additions & 2 deletions packages/yew/src/server_renderer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::stream::{Stream, StreamExt};
use tracing::Instrument;

use crate::html::{BaseComponent, Scope};
use crate::platform::fmt::{BufStream, DEFAULT_BUF_SIZE};
use crate::platform::fmt::{ResolvedBufStream, DEFAULT_BUF_SIZE};
use crate::platform::{run_pinned, spawn_local};

/// A Yew Server-side Renderer that renders on the current thread.
Expand Down Expand Up @@ -105,7 +105,7 @@ where
let scope = Scope::<COMP>::new(None);

let outer_span = tracing::Span::current();
BufStream::new(self.capacity, move |mut w| async move {
ResolvedBufStream::new(self.capacity, move |mut w| async move {
let render_span = tracing::debug_span!("render_stream_item");
render_span.follows_from(outer_span);
scope
Expand Down
11 changes: 5 additions & 6 deletions packages/yew/src/virtual_dom/vlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,11 @@ mod feat_ssr {

// Concurrently render rest children into a separate buffer.
let rest_child_furs = rest_children.iter().map(|child| {
let (s, resolver) =
BufStream::new_with_resolver(capacity, move |mut w| async move {
child
.render_into_stream(&mut w, parent_scope, hydratable)
.await;
});
let (s, resolver) = BufStream::new(capacity, move |mut w| async move {
child
.render_into_stream(&mut w, parent_scope, hydratable)
.await;
});

child_streams.push(s);

Expand Down

0 comments on commit a470862

Please sign in to comment.