Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iteration policy API for eager polling loops #1961

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7e76c90
Macro poll_loop
mzabaluev Nov 7, 2019
455fbf6
Require a token for breaking out of poll_loop!
mzabaluev Nov 7, 2019
b24c62b
Reexport macro poll_loop from futures
mzabaluev Nov 7, 2019
df4d079
Reexport task::ToYield from futures
mzabaluev Nov 7, 2019
ae5303f
futures-util: Ground work for poll loop guards
mzabaluev Nov 7, 2019
3c0ee91
Busy loop guard for stream::Collect
mzabaluev Nov 7, 2019
c7a6ddd
Busy loop guard for stream::Concat
mzabaluev Nov 7, 2019
60c80a5
Busy loop guard for stream::Flatten
mzabaluev Nov 7, 2019
b14a1f5
Busy loop guard for stream::Fold
mzabaluev Nov 7, 2019
7dd521e
Busy loop guard for stream::ForEach
mzabaluev Nov 7, 2019
a01ef0e
Busy loop guard for stream::ForEachConcurrent
mzabaluev Nov 7, 2019
3aea69c
Busy loop guard for stream::Forward
mzabaluev Nov 7, 2019
78ed967
Busy loop guard for stream::SkipWhile
mzabaluev Nov 7, 2019
46ff20b
Audited stream::Split::poll_ready for busy looping
mzabaluev Nov 7, 2019
58c0791
Audited IntoAsyncRead::poll_read for busy looping
mzabaluev Nov 7, 2019
ac0ee30
Busy loop guard for stream::TryCollect
mzabaluev Nov 7, 2019
ff1b4b7
Busy loop guard for stream::TryConcat
mzabaluev Nov 7, 2019
81de5f5
Busy loop guards for stream::{Filter, FilterMap}
mzabaluev Nov 7, 2019
ae22dc0
Busy loop guard for stream::TryFilterMap
mzabaluev Nov 7, 2019
36382a9
Busy loop guard for stream::TryFilter
mzabaluev Nov 7, 2019
60660e1
Busy loop guard for stream::TryFlatten
mzabaluev Nov 7, 2019
69868ed
Busy loop guard for stream::TryFold
mzabaluev Nov 7, 2019
e6f3d6c
Busy loop guard for stream::TryForEachConcurrent
mzabaluev Nov 7, 2019
808ef7a
Busy loop guard for stream::TryForEach
mzabaluev Nov 7, 2019
3107730
Busy loop guard for stream::TrySkipWhile
mzabaluev Nov 7, 2019
1f1c55b
Optimize poll_loop! using NonNullU32
mzabaluev Nov 8, 2019
7432b5c
Consistently use core_reexport in poll_loop!
mzabaluev Nov 14, 2019
c07e93e
Busy loop guard for sink::SendAll
mzabaluev Nov 15, 2019
4c7da04
Method yield_after_every to tune iteration guards
mzabaluev Nov 15, 2019
65262fd
iteration::Policy trait for poll_loop!
mzabaluev Nov 16, 2019
fefd6e5
Rotate the check in poll_loop!, remove ToYield
mzabaluev Nov 16, 2019
4271013
Rework the loop guard protocol
mzabaluev Nov 16, 2019
3da7d7d
Retouched module-level doc on iteration
mzabaluev Nov 17, 2019
e6f3a06
Improve yield_after macros
mzabaluev Nov 17, 2019
f6d6670
Satisfy clippy about split borrows
mzabaluev Nov 17, 2019
967de9b
Export the iteration API from futures
mzabaluev Nov 17, 2019
d7a4339
iteration: Doc fixups and proofreading
mzabaluev Nov 17, 2019
1fc8a23
Change default iteration limit to a power of two
mzabaluev Nov 17, 2019
ea8ecb0
iteration: Backport to Rust 1.36
mzabaluev Nov 17, 2019
3ffb24a
Simplify iteration::Policy and erase LoopGuard
mzabaluev Nov 18, 2019
d129010
Improve naming in iteration API
mzabaluev Nov 19, 2019
9749923
Make iteration::Unlimited a unit-like struct
mzabaluev Nov 19, 2019
db4b867
Tests for poll_loop! and the LoopPolicy types
mzabaluev Nov 23, 2019
7f24550
futures-util: More descriptive yield_after macros
mzabaluev Nov 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 72 additions & 0 deletions futures-core/src/iteration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! Utilities for implementing iterative polling in a cooperative way.

use core::num::NonZeroU32;

/// Iteration policy checker for eager polling loops.
pub trait LoopPolicy {
/// State for checking iterations over the lifetime of a polling loop.
type State;

/// Called before entering the loop to initialize state for
/// iteration checks.
fn enter(&mut self) -> Self::State;

/// Called at the end of each iteration to update the check state
/// and decide whether the poll function should yield, that is,
/// wake up the task and return
/// [`Pending`](core::task::poll::Poll::Pending).
fn yield_check(&mut self, state: &mut Self::State) -> bool;
}

/// An unlimited iteration policy.
///
/// The [`LoopPolicy`] implementation for a value of this token type bypasses
/// any checking on iterations of a polling loop, allowing it to continue
/// indefinitely until ended by logic in the loop body (normally when
/// an asynchronous source is pending or the poll resolves with an output
/// value).
#[derive(Debug)]
pub struct Unlimited;

impl LoopPolicy for Unlimited {
type State = ();

#[inline]
fn enter(&mut self) {}

#[inline]
fn yield_check(&mut self, _: &mut ()) -> bool {
false
}
}

/// An iteration policy with a limit on the number of consecutive iterations.
///
/// The [`LoopPolicy`] implementation of this type runs a counter on the number
/// of iterations and, when the given limit is reached, instructs the
/// [`yield_check`](LoopPolicy::yield_check) caller to yield.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Limit(NonZeroU32);

impl Limit {
/// Creates a policy checker instance with the given limit.
#[inline]
pub const fn new(value: NonZeroU32) -> Self {
Limit(value)
}
}

impl LoopPolicy for Limit {
type State = u32;

#[inline]
fn enter(&mut self) -> u32 {
self.0.into()
}

#[inline]
fn yield_check(&mut self, counter: &mut u32) -> bool {
*counter -= 1;
*counter == 0
}
}
2 changes: 2 additions & 0 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub mod stream;
#[macro_use]
pub mod task;

pub mod iteration;

// Not public API.
#[doc(hidden)]
pub mod core_reexport {
Expand Down
51 changes: 51 additions & 0 deletions futures-core/src/task/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,54 @@ macro_rules! ready {
return $crate::core_reexport::task::Poll::Pending,
})
}

/// An eager polling loop with a customizable policy on iterations.
///
/// This macro helps implement eager polling loops in a way that prevents
/// uncooperative polling behavior. It's typical for such a loop to occur in
/// a [`Future`] implementation that repeatedly polls asynchronous event
/// sources, most often a [`Stream`], to perform some internal work without
/// resolving the future and resume polling in the next loop iteration.
/// If the source polls return [`Ready`] for many consecutive iterations,
/// such a loop will not break for the whole duration,
/// potentially starving other asynchronous operations in the same task
/// from being polled.
///
/// To prevent this, `poll_loop!` uses the [`LoopPolicy`] protocol to check
/// at the end of each iteration whether to yield to the task by returning
/// [`Pending`].
/// The first parameter of the macro receives a mutable pointer to the
/// iteration policy checker implementing `LoopPolicy`.
/// The second parameter receives the reference to the
/// [`Context`] passed to the poll function.
/// The body of a loop iteration is given in the third parameter.
///
/// Note that a `continue` expression in the loop body has the effect of
/// bypassing the iteration policy check, so it should be used with caution.
/// `break` can be used as in the body of an unlabeled `loop`, also for
/// producing the value of the macro invocation used as an expression.
/// The loop is immediately terminated by `break` with no hidden effects.
///
/// [`Future`]: core::future::Future
/// [`Stream`]: stream::Stream
/// [`Context`]: core::task::Context
/// [`Pending`]: core::task::Poll::Pending
/// [`Ready`]: core::task::Poll::Ready
/// [`LoopPolicy`]: iteration::LoopPolicy
#[macro_export]
macro_rules! poll_loop {
{$policy:expr, $cx:expr, $body:expr} => {
{
#[allow(clippy::deref_addrof)]
let policy = &mut *$policy;
let mut state = $crate::iteration::LoopPolicy::enter(policy);
loop {
{ $body }
if $crate::iteration::LoopPolicy::yield_check(policy, &mut state) {
$crate::core_reexport::task::Context::waker($cx).wake_by_ref();
return $crate::core_reexport::task::Poll::Pending;
}
}
}
}
}
7 changes: 6 additions & 1 deletion futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ compile_error!("The `read-initializer` feature requires the `unstable` feature a
#[cfg(feature = "alloc")]
extern crate alloc;

#[macro_use(ready)]
#[macro_use(ready, poll_loop)]
extern crate futures_core;

// Macro re-exports
Expand All @@ -48,6 +48,11 @@ pub use self::async_await::*;
#[doc(hidden)]
pub use futures_core::core_reexport;

#[macro_use]
mod yield_after;

use yield_after::DEFAULT_YIELD_AFTER_LIMIT;

macro_rules! cfg_target_has_atomic {
($($item:item)*) => {$(
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
Expand Down
60 changes: 39 additions & 21 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::stream::{StreamExt, TryStreamExt, Fuse};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::iteration;
use futures_core::stream::{TryStream, Stream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
Expand All @@ -17,6 +18,7 @@ where
sink: &'a mut Si,
stream: Fuse<&'a mut St>,
buffered: Option<St::Ok>,
yield_after: iteration::Limit,
}

impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
Expand All @@ -30,6 +32,7 @@ where
.field("sink", &self.sink)
.field("stream", &self.stream)
.field("buffered", &self.buffered)
.field("yield_after", &self.yield_after)
.finish()
}
}
Expand All @@ -46,6 +49,12 @@ where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
{
future_method_yield_after_every! {
#[pollee = "the underlying stream and the sink"]
#[why_busy = "the stream consecutively yields items that the sink
is ready to accept,"]
}

pub(super) fn new(
sink: &'a mut Si,
stream: &'a mut St,
Expand All @@ -54,23 +63,26 @@ where
sink,
stream: stream.fuse(),
buffered: None,
yield_after: crate::DEFAULT_YIELD_AFTER_LIMIT,
}
}
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
item: St::Ok,
) -> Poll<Result<(), Si::Error>> {
debug_assert!(self.buffered.is_none());
match Pin::new(&mut self.sink).poll_ready(cx)? {
Poll::Ready(()) => {
Poll::Ready(Pin::new(&mut self.sink).start_send(item))
}
Poll::Pending => {
self.buffered = Some(item);
Poll::Pending
}
fn try_start_send<Si, Ok>(
mut sink: Pin<&mut Si>,
cx: &mut Context<'_>,
item: Ok,
buffered: &mut Option<Ok>,
) -> Poll<Result<(), Si::Error>>
where
Si: Sink<Ok> + ?Sized,
{
debug_assert!(buffered.is_none());
match sink.as_mut().poll_ready(cx)? {
Poll::Ready(()) => Poll::Ready(sink.as_mut().start_send(item)),
Poll::Pending => {
*buffered = Some(item);
Poll::Pending
}
}
}
Expand All @@ -87,23 +99,29 @@ where
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
let (stream, mut sink, buffered) = (
&mut this.stream,
Pin::new(&mut this.sink),
&mut this.buffered,
);

// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if let Some(item) = this.buffered.take() {
ready!(this.try_start_send(cx, item))?
if let Some(item) = buffered.take() {
ready!(try_start_send(sink.as_mut(), cx, item, buffered))?
}

loop {
match this.stream.try_poll_next_unpin(cx)? {
poll_loop! { &mut this.yield_after, cx,
match stream.try_poll_next_unpin(cx)? {
Poll::Ready(Some(item)) => {
ready!(this.try_start_send(cx, item))?
ready!(try_start_send(sink.as_mut(), cx, item, buffered))?
}
Poll::Ready(None) => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(sink.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(()))
}
Poll::Pending => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(sink.as_mut().poll_flush(cx))?;
return Poll::Pending
}
}
Expand Down
30 changes: 23 additions & 7 deletions futures-util/src/stream/stream/collect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::iteration;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
Expand All @@ -11,22 +12,36 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned};
pub struct Collect<St, C> {
stream: St,
collection: C,
yield_after: iteration::Limit,
}

impl<St: Unpin, C> Unpin for Collect<St, C> {}

impl<St: Stream, C: Default> Collect<St, C> {
unsafe_pinned!(stream: St);
unsafe_unpinned!(collection: C);
unsafe_unpinned!(yield_after: iteration::Limit);

fn finish(mut self: Pin<&mut Self>) -> C {
mem::replace(self.as_mut().collection(), Default::default())
fn split_borrows(
self: Pin<&mut Self>,
) -> (Pin<&mut St>, &mut C, &mut iteration::Limit) {
unsafe {
let this = self.get_unchecked_mut();
(
Pin::new_unchecked(&mut this.stream),
&mut this.collection,
&mut this.yield_after,
)
}
}

future_method_yield_after_every!();

pub(super) fn new(stream: St) -> Collect<St, C> {
Collect {
stream,
collection: Default::default(),
yield_after: crate::DEFAULT_YIELD_AFTER_LIMIT,
}
}
}
Expand All @@ -46,11 +61,12 @@ where St: Stream,
{
type Output = C;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
loop {
match ready!(self.as_mut().stream().poll_next(cx)) {
Some(e) => self.as_mut().collection().extend(Some(e)),
None => return Poll::Ready(self.as_mut().finish()),
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
let (mut stream, collection, yield_after) = self.split_borrows();
poll_loop! { yield_after, cx,
match ready!(stream.as_mut().poll_next(cx)) {
Some(e) => collection.extend(Some(e)),
None => return Poll::Ready(mem::replace(collection, Default::default())),
}
}
}
Expand Down