New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polled SSR Stream #2824
Polled SSR Stream #2824
Changes from 31 commits
5bf1ae8
fb2603e
892b759
29a46bd
20fc4a3
8a41c3e
8688045
3bd9e0a
ed36760
2799a22
02a838d
3de0770
689980e
cbbd765
2045c49
a470862
3ac02fd
345b745
d1a6ab3
f0863a4
7dcaf6e
ced12ac
475f52e
e757d35
0a399ba
6ae5a65
f310575
e18e0fd
67348d3
f491d6c
2a300fe
a78f0b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
use std::cell::UnsafeCell; | ||
use std::fmt::{self, Write}; | ||
use std::marker::PhantomData; | ||
use std::rc::Rc; | ||
use std::task::{Poll, Waker}; | ||
|
||
use futures::stream::{FusedStream, Stream}; | ||
|
||
static BUF_SIZE: usize = 1024; | ||
|
||
enum BufStreamState { | ||
Ready, | ||
Pending(Waker), | ||
Done, | ||
} | ||
|
||
struct Inner { | ||
buf: String, | ||
state: BufStreamState, | ||
|
||
// This type is not send or sync. | ||
_marker: PhantomData<Rc<()>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not do: impl !Send for Inner {}
impl !Sync for Inner {} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is due to that negative impls is not stable. See: https://doc.rust-lang.org/unstable-book/language-features/negative-impls.html |
||
} | ||
|
||
impl Inner { | ||
#[inline] | ||
const fn new() -> Self { | ||
Self { | ||
buf: String::new(), | ||
state: BufStreamState::Ready, | ||
_marker: PhantomData, | ||
} | ||
} | ||
|
||
#[inline] | ||
fn wake(&mut self) { | ||
if let BufStreamState::Pending(ref waker) = self.state { | ||
waker.wake_by_ref(); | ||
self.state = BufStreamState::Ready; | ||
} | ||
} | ||
|
||
#[inline] | ||
fn buf_reserve(&mut self) { | ||
if self.buf.is_empty() { | ||
self.buf.reserve(BUF_SIZE); | ||
} | ||
} | ||
} | ||
|
||
impl Write for Inner { | ||
fn write_str(&mut self, s: &str) -> fmt::Result { | ||
if s.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
self.wake(); | ||
if s.len() < BUF_SIZE { | ||
self.buf_reserve(); | ||
} | ||
|
||
self.buf.write_str(s) | ||
} | ||
|
||
fn write_char(&mut self, c: char) -> fmt::Result { | ||
self.wake(); | ||
self.buf_reserve(); | ||
|
||
self.buf.write_char(c) | ||
} | ||
|
||
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { | ||
self.wake(); | ||
self.buf_reserve(); | ||
|
||
self.buf.write_fmt(args) | ||
} | ||
} | ||
|
||
/// An asynchronous [`String`] writer. | ||
/// | ||
/// This type implements [`fmt::Write`] and can be used with [`write!`] and [`writeln!`]. | ||
pub(crate) struct BufWriter { | ||
inner: Rc<UnsafeCell<Inner>>, | ||
} | ||
|
||
impl Write for BufWriter { | ||
#[inline] | ||
fn write_str(&mut self, s: &str) -> fmt::Result { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
inner.write_str(s) | ||
} | ||
|
||
#[inline] | ||
fn write_char(&mut self, c: char) -> fmt::Result { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
inner.write_char(c) | ||
} | ||
|
||
#[inline] | ||
fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
inner.write_fmt(args) | ||
} | ||
} | ||
|
||
impl Drop for BufWriter { | ||
fn drop(&mut self) { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
inner.wake(); | ||
inner.state = BufStreamState::Done; | ||
} | ||
} | ||
|
||
/// An asynchronous [`String`] reader. | ||
pub(crate) struct BufReader { | ||
inner: Rc<UnsafeCell<Inner>>, | ||
} | ||
|
||
impl Stream for BufReader { | ||
type Item = String; | ||
|
||
fn poll_next( | ||
self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &mut *self.inner.get() }; | ||
|
||
if !inner.buf.is_empty() { | ||
let buf = std::mem::take(&mut inner.buf); | ||
return Poll::Ready(Some(buf)); | ||
} | ||
|
||
if let BufStreamState::Done = inner.state { | ||
return Poll::Ready(None); | ||
} | ||
|
||
inner.state = BufStreamState::Pending(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
} | ||
|
||
impl FusedStream for BufReader { | ||
fn is_terminated(&self) -> bool { | ||
// SAFETY: | ||
// | ||
// We can acquire a mutable reference without checking as: | ||
// | ||
// - This type is !Sync and !Send. | ||
// - This function is not used by any other functions that has access to the inner type. | ||
// - The mutable reference is dropped at the end of this function. | ||
let inner = unsafe { &*self.inner.get() }; | ||
|
||
matches!( | ||
(&inner.state, inner.buf.is_empty()), | ||
(BufStreamState::Done, true) | ||
) | ||
} | ||
} | ||
|
||
/// Creates an asynchronous buffer that operates over String. | ||
pub(crate) fn buffer() -> (BufWriter, BufReader) { | ||
let inner = Rc::new(UnsafeCell::new(Inner::new())); | ||
|
||
let w = { | ||
let inner = inner.clone(); | ||
BufWriter { inner } | ||
}; | ||
|
||
let r = BufReader { inner }; | ||
|
||
(w, r) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
//! Asynchronous utilities to work with `String`s. | ||
|
||
use std::future::Future; | ||
|
||
use futures::future::{self, MaybeDone}; | ||
use futures::stream::{FusedStream, Stream}; | ||
use futures::StreamExt; | ||
use pin_project::pin_project; | ||
|
||
mod buffer; | ||
|
||
pub(crate) use buffer::{buffer, BufReader, BufWriter}; | ||
|
||
/// A buffered asynchronous [`String`] [`Stream`]. | ||
/// | ||
/// A BufStream combines a BufWriter - BufReader pair and a resolving future that writes to the | ||
/// buffer and polls the future alongside the buffer. | ||
#[pin_project] | ||
pub(crate) struct BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
Comment on lines
+20
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps remove the bound from here? That would mean that everywhere this type is named doesn't require the bound - only the usages do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot remove this as this bound is declared on the MaybeDone type itself. |
||
{ | ||
#[pin] | ||
resolver: MaybeDone<F>, | ||
inner: BufReader, | ||
} | ||
|
||
impl<F> BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
/// Creates a `BufStream`. | ||
pub fn new<C>(f: C) -> Self | ||
where | ||
C: FnOnce(BufWriter) -> F, | ||
{ | ||
let (w, r) = buffer(); | ||
let resolver = future::maybe_done(f(w)); | ||
|
||
BufStream { inner: r, resolver } | ||
} | ||
} | ||
|
||
impl<F> Stream for BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
type Item = String; | ||
|
||
#[inline] | ||
fn poll_next( | ||
self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
let _ = this.resolver.poll(cx); | ||
|
||
this.inner.poll_next_unpin(cx) | ||
} | ||
} | ||
|
||
impl<F> FusedStream for BufStream<F> | ||
where | ||
F: Future<Output = ()>, | ||
{ | ||
#[inline] | ||
fn is_terminated(&self) -> bool { | ||
self.inner.is_terminated() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pin-project-lite
maybe?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: https://docs.rs/pin-project-lite/0.2.8/pin_project_lite/#different-no-proc-macro-related-dependencies
The primary benefit of
#[pin_project]
is that rustfmt will continue to work for the struct.