New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polled SSR Stream #2824
Merged
Merged
Polled SSR Stream #2824
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
5bf1ae8
Switch to pinned channels.
futursolo fb2603e
Fix ServerRenderer so it's not blocked until the result is resolved.
futursolo 892b759
Fix tests.
futursolo 29a46bd
Remove unused SendError.
futursolo 20fc4a3
Merge branch 'master' into pinned-channels
futursolo 8a41c3e
Implement a stream to be polled alongside rendering.
futursolo 8688045
Update Buffer Size.
futursolo 3bd9e0a
Make Send renderer work.
futursolo ed36760
Remove pinned channels.
futursolo 2799a22
Unified Naming.
futursolo 02a838d
Optimise code.
futursolo 3de0770
Restore capacity.
futursolo 689980e
merge 'master' into polled-stream
futursolo cbbd765
Remove unused profile.
futursolo 2045c49
Merge branch 'master' into polled-stream
futursolo a470862
Default to separate resolver.
futursolo 3ac02fd
Reduce allocations on string.
futursolo 345b745
Adjust API.
futursolo d1a6ab3
Remove duplicate trait bound.
futursolo f0863a4
Update docs.
futursolo 7dcaf6e
Remove capacity setting.
futursolo ced12ac
Merge branch 'master' into polled-stream
futursolo 475f52e
Merge branch 'master' into polled-stream
futursolo e757d35
Unsafe?
futursolo 0a399ba
Separate files.
futursolo 6ae5a65
Adjust inlining.
futursolo f310575
Merge branch 'master' into polled-stream
futursolo e18e0fd
Fix test.
futursolo 67348d3
Update notice.
futursolo f491d6c
Update documentation.
futursolo 2a300fe
Merge branch 'master' into polled-stream
futursolo a78f0b3
Fix tests.
futursolo File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
//! This module contains types for I/O functionality. | ||
|
||
use std::cell::RefCell; | ||
use std::fmt::{self, Write}; | ||
use std::future::Future; | ||
use std::rc::Rc; | ||
use std::task::{Poll, Waker}; | ||
|
||
use futures::future::{self, FusedFuture, MaybeDone}; | ||
use futures::stream::{FusedStream, Stream}; | ||
use pin_project::pin_project; | ||
|
||
pub(crate) static DEFAULT_BUF_SIZE: usize = 1024; | ||
|
||
enum BufStreamInner { | ||
Combined { | ||
buf: String, | ||
}, | ||
Detached { | ||
buf: String, | ||
waker: Option<Waker>, | ||
done: bool, | ||
}, | ||
} | ||
|
||
impl BufStreamInner { | ||
#[inline] | ||
const fn new_detached() -> Self { | ||
Self::Detached { | ||
buf: String::new(), | ||
waker: None, | ||
done: false, | ||
} | ||
} | ||
|
||
#[inline] | ||
const fn new_combined() -> Self { | ||
Self::Combined { buf: String::new() } | ||
} | ||
|
||
#[inline] | ||
fn buf(&self) -> &String { | ||
match self { | ||
Self::Combined { ref buf } => buf, | ||
Self::Detached { ref buf, .. } => buf, | ||
} | ||
} | ||
|
||
#[inline] | ||
fn buf_mut(&mut self) -> &mut String { | ||
match self { | ||
Self::Combined { ref mut buf } => buf, | ||
Self::Detached { ref mut buf, .. } => buf, | ||
} | ||
} | ||
|
||
fn wake(&self) { | ||
match self { | ||
Self::Combined { .. } => {} | ||
Self::Detached { ref waker, .. } => { | ||
if let Some(m) = waker { | ||
m.wake_by_ref(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn set_waker(&mut self, waker: Waker) { | ||
match self { | ||
Self::Combined { .. } => {} | ||
Self::Detached { | ||
waker: ref mut current_waker, | ||
.. | ||
} => { | ||
*current_waker = Some(waker); | ||
} | ||
} | ||
} | ||
|
||
fn finish(&mut self) { | ||
match self { | ||
Self::Combined { .. } => {} | ||
Self::Detached { | ||
ref waker, | ||
ref mut done, | ||
.. | ||
} => { | ||
*done = true; | ||
if let Some(m) = waker { | ||
m.wake_by_ref(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn is_finished(&self) -> Option<bool> { | ||
match self { | ||
Self::Combined { .. } => None, | ||
Self::Detached { ref buf, done, .. } => Some(buf.is_empty() && *done), | ||
} | ||
} | ||
} | ||
|
||
pub(crate) struct BufWriter { | ||
inner: Rc<RefCell<BufStreamInner>>, | ||
capacity: usize, | ||
} | ||
|
||
impl BufWriter { | ||
pub fn capacity(&self) -> usize { | ||
self.capacity | ||
} | ||
} | ||
|
||
impl Write for BufWriter { | ||
fn write_str(&mut self, s: &str) -> fmt::Result { | ||
if s.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
let mut inner = self.inner.borrow_mut(); | ||
inner.wake(); | ||
|
||
let buf = inner.buf_mut(); | ||
if buf.is_empty() { | ||
buf.reserve(self.capacity); | ||
} | ||
|
||
buf.write_str(s) | ||
} | ||
|
||
fn write_char(&mut self, c: char) -> fmt::Result { | ||
let mut inner = self.inner.borrow_mut(); | ||
inner.wake(); | ||
|
||
let buf = inner.buf_mut(); | ||
if buf.is_empty() { | ||
buf.reserve(self.capacity); | ||
} | ||
|
||
buf.write_char(c) | ||
} | ||
|
||
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { | ||
let mut inner = self.inner.borrow_mut(); | ||
|
||
let buf = inner.buf_mut(); | ||
if buf.is_empty() { | ||
buf.reserve(self.capacity); | ||
} | ||
|
||
buf.write_fmt(args) | ||
} | ||
} | ||
|
||
#[pin_project] | ||
pub(crate) struct BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
#[pin] | ||
resolver: Option<MaybeDone<F>>, | ||
inner: Rc<RefCell<BufStreamInner>>, | ||
} | ||
|
||
impl<F> BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
pub fn new<C>(capacity: usize, f: C) -> Self | ||
where | ||
C: FnOnce(BufWriter) -> F, | ||
{ | ||
let inner = Rc::new(RefCell::new(BufStreamInner::new_combined())); | ||
|
||
let resolver = { | ||
let inner = inner.clone(); | ||
let w = BufWriter { inner, capacity }; | ||
|
||
f(w) | ||
}; | ||
|
||
Self { | ||
resolver: Some(future::maybe_done(resolver)), | ||
inner, | ||
} | ||
} | ||
|
||
pub fn new_with_resolver<C>(capacity: usize, f: C) -> (BufStream<F>, impl Future<Output = ()>) | ||
where | ||
C: FnOnce(BufWriter) -> F, | ||
{ | ||
let inner = Rc::new(RefCell::new(BufStreamInner::new_detached())); | ||
|
||
let resolver = { | ||
let inner = inner.clone(); | ||
let w = { | ||
let inner = inner.clone(); | ||
BufWriter { inner, capacity } | ||
}; | ||
|
||
async move { | ||
f(w).await; | ||
inner.borrow_mut().finish(); | ||
} | ||
}; | ||
|
||
( | ||
Self { | ||
resolver: None, | ||
inner, | ||
}, | ||
resolver, | ||
) | ||
} | ||
} | ||
|
||
impl<F> Stream for BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
type Item = String; | ||
|
||
fn poll_next( | ||
self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
|
||
match this.resolver.as_pin_mut() { | ||
Some(mut resolver) => { | ||
let _ = resolver.as_mut().poll(cx); | ||
|
||
let mut inner = this.inner.borrow_mut(); | ||
|
||
match (inner.buf().is_empty(), resolver.is_terminated()) { | ||
(true, true) => Poll::Ready(None), | ||
(true, false) => Poll::Pending, | ||
(false, _) => Poll::Ready(Some(inner.buf_mut().split_off(0))), | ||
} | ||
} | ||
None => { | ||
let mut inner = this.inner.borrow_mut(); | ||
|
||
if !inner.buf().is_empty() { | ||
return Poll::Ready(Some(inner.buf_mut().split_off(0))); | ||
} | ||
|
||
if Some(true) == inner.is_finished() { | ||
return Poll::Ready(None); | ||
} | ||
|
||
inner.set_waker(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl<F> FusedStream for BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
fn is_terminated(&self) -> bool { | ||
let inner = self.inner.borrow(); | ||
|
||
match self.resolver.as_ref() { | ||
Some(resolver) => inner.buf().is_empty() && resolver.is_terminated(), | ||
None => inner.is_finished().unwrap_or_default(), | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pin-project-lite
maybe?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: https://docs.rs/pin-project-lite/0.2.8/pin_project_lite/#different-no-proc-macro-related-dependencies
The primary benefit of
#[pin_project]
is that rustfmt will continue to work for the struct.