Skip to content

Commit

Permalink
Method yield_after_every to tune iteration guards
Browse files Browse the repository at this point in the history
Add combinator method .yield_after_every(n) to stream and future
combinator types that have been provided with iteration guards.

This gives the user means to fine tune the yielding behavior.
  • Loading branch information
mzabaluev committed Nov 15, 2019
1 parent d4aa251 commit 5fa2ae7
Show file tree
Hide file tree
Showing 22 changed files with 200 additions and 6 deletions.
11 changes: 5 additions & 6 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub use self::async_await::*;
#[doc(hidden)]
pub use futures_core::core_reexport;

#[macro_use]
mod yield_after;

use yield_after::DEFAULT_YIELD_AFTER_LIMIT;

macro_rules! cfg_target_has_atomic {
($($item:item)*) => {$(
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
Expand Down Expand Up @@ -117,9 +122,3 @@ cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
pub mod lock;
}

use core::num::NonZeroU32;

// Default for repetition limits on eager polling loops, to prevent
// stream-consuming combinators like ForEach from starving other tasks.
const DEFAULT_YIELD_AFTER_LIMIT: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(100) };
6 changes: 6 additions & 0 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
{
future_method_yield_after_every! {
#[doc = "the underlying stream and the sink"]
#[doc = "the stream consecutively yields items that the sink
is ready to accept,"]
}

pub(super) fn new(
sink: &'a mut Si,
stream: &'a mut St,
Expand Down
2 changes: 2 additions & 0 deletions futures-util/src/stream/stream/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ impl<St: Stream, C: Default> Collect<St, C> {
mem::replace(self.as_mut().collection(), Default::default())
}

future_method_yield_after_every!();

pub(super) fn new(stream: St) -> Collect<St, C> {
Collect {
stream,
Expand Down
2 changes: 2 additions & 0 deletions futures-util/src/stream/stream/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ where St: Stream,
unsafe_unpinned!(accum: Option<St::Item>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every!();

pub(super) fn new(stream: St) -> Concat<St> {
Concat {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/stream/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ where St: Stream,
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by
the predicate closure,"]
#[doc = "items are consecutively yielded by the stream,
but get immediately filtered out,"]
}
}

impl<St, Fut, F> FusedStream for Filter<St, Fut, F>
Expand Down
5 changes: 5 additions & 0 deletions futures-util/src/stream/stream/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ impl<St, Fut, F> FilterMap<St, Fut, F>
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by the map closure,"]
#[doc = "items are consecutively yielded by the stream, but get immediately filtered out,"]
}
}

impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F>
Expand Down
5 changes: 5 additions & 0 deletions futures-util/src/stream/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ where
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream or a yet unconsumed stream yielded by it"]
#[doc = "the underlying stream keeps yielding streams that immediately poll empty,"]
}
}

impl<St> FusedStream for Flatten<St>
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/stream/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ where St: Stream,
unsafe_pinned!(future: Option<Fut>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and, if pending, a future returned by
the accumulation closure,"]
#[doc = "the underlying stream consecutively yields items and
the accumulation futures immediately resolve as ready,"]
}

pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> {
Fold {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ where St: Stream,
unsafe_pinned!(future: Option<Fut>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and, if pending, a future returned by
the processing closure,"]
#[doc = "the underlying stream consecutively yields items and
the processing futures immediately resolve as ready,"]
}

pub(super) fn new(stream: St, f: F) -> ForEach<St, Fut, F> {
ForEach {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/stream/for_each_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ where St: Stream,
unsafe_unpinned!(limit: Option<NonZeroUsize>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and a pool of pending futures returned by
the processing closure,"]
#[doc = "the underlying stream consecutively yields items and some of
the processing futures turn out ready,"]
}

pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> {
ForEachConcurrent {
stream: Some(stream),
Expand Down
6 changes: 6 additions & 0 deletions futures-util/src/stream/stream/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ where
unsafe_unpinned!(buffered_item: Option<St::Ok>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and the sink"]
#[doc = "the stream consecutively yields items that the sink
is ready to accept,"]
}

pub(super) fn new(stream: St, sink: Si) -> Self {
Forward {
sink: Some(sink),
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/stream/skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by
the predicate closure,"]
#[doc = "items are consecutively yielded by the stream,
but the predicate immediately resolves to skip them,"]
}
}

impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
Expand Down
2 changes: 2 additions & 0 deletions futures-util/src/stream/try_stream/try_collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ impl<St: TryStream, C: Default> TryCollect<St, C> {
unsafe_unpinned!(items: C);
unsafe_unpinned!(yield_after: NonZeroU32);

try_future_method_yield_after_every!();

pub(super) fn new(s: St) -> TryCollect<St, C> {
TryCollect {
stream: s,
Expand Down
2 changes: 2 additions & 0 deletions futures-util/src/stream/try_stream/try_concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ where
unsafe_unpinned!(accum: Option<St::Ok>);
unsafe_unpinned!(yield_after: NonZeroU32);

try_future_method_yield_after_every!();

pub(super) fn new(stream: St) -> TryConcat<St> {
TryConcat {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/try_stream/try_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl<St, Fut, F> TryFilter<St, Fut, F>
unsafe_unpinned!(pending_item: Option<St::Ok>);
unsafe_unpinned!(yield_after: NonZeroU32);

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by
the predicate closure,"]
#[doc = "`Ok` items are consecutively yielded by the stream,
but get immediately filtered out,"]
}

pub(super) fn new(stream: St, f: F) -> Self {
TryFilter {
stream,
Expand Down
6 changes: 6 additions & 0 deletions futures-util/src/stream/try_stream/try_filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ impl<St, Fut, F> TryFilterMap<St, Fut, F> {
unsafe_pinned!(pending: Option<Fut>);
unsafe_unpinned!(yield_after: NonZeroU32);

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by the map closure,"]
#[doc = "`Ok` items are consecutively yielded by the stream,
but get immediately filtered out,"]
}

pub(super) fn new(stream: St, f: F) -> Self {
TryFilterMap {
stream,
Expand Down
5 changes: 5 additions & 0 deletions futures-util/src/stream/try_stream/try_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ where
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream or a yet unconsumed stream yielded by it"]
#[doc = "the underlying stream keeps yielding streams that immediately poll empty,"]
}
}

impl<St> FusedStream for TryFlatten<St>
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/try_stream/try_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ where St: TryStream,
unsafe_pinned!(future: Option<Fut>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and, if pending, a future returned by
the accumulation closure,"]
#[doc = "the underlying stream consecutively yields `Ok` items and
the accumulation futures immediately resolve with `Ok`,"]
}

pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> {
TryFold {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/try_stream/try_for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ where St: TryStream,
unsafe_pinned!(future: Option<Fut>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and, if pending, a future returned by
the processing closure,"]
#[doc = "the underlying stream consecutively yields `Ok` items and
the processing futures immediately resolve with `Ok`,"]
}

pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> {
TryForEach {
stream,
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/try_stream/try_for_each_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ where St: TryStream,
unsafe_unpinned!(limit: Option<NonZeroUsize>);
unsafe_unpinned!(yield_after: NonZeroU32);

future_method_yield_after_every! {
#[doc = "the underlying stream and a pool of pending futures returned by
the processing closure,"]
#[doc = "the underlying stream consecutively yields `Ok` items and some of
the processing futures resolve, all of them with `Ok`,"]
}

pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> {
TryForEachConcurrent {
stream: Some(stream),
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/stream/try_stream/try_skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F>
pub fn into_inner(self) -> St {
self.stream
}

stream_method_yield_after_every! {
#[doc = "the underlying stream and, when pending, a future returned by
the predicate closure,"]
#[doc = "`Ok` items are consecutively yielded by the stream,
but the predicate immediately resolves to skip them,"]
}
}

impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
Expand Down
84 changes: 84 additions & 0 deletions futures-util/src/yield_after.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use core::num::NonZeroU32;

// Default for repetition limits on eager polling loops, to prevent
// stream-consuming combinators like ForEach from starving other tasks.
pub(crate) const DEFAULT_YIELD_AFTER_LIMIT: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(100) };

macro_rules! future_method_yield_after_every {
() => {
future_method_yield_after_every! {
#[doc = "the underlying stream"]
#[doc = "the stream consecutively yields items,"]
}
};
(#[$pollee:meta] #[$why_busy:meta]) => {
/// Changes the maximum number of iterations before `poll` yields.
///
/// The implementation of [`poll`] on this future
/** polls */#[$pollee]/** in a loop. */
/// To prevent blocking in the call to `poll` for too long while
#[$why_busy]
/// the number of iterations is capped to an internal limit,
/// after which the `poll` function wakes up the task
/// and returns [`Pending`]. The `yield_after_every` combinator
/// can be used to tune the iteration limit, returning a future
/// with the limit updated to the provided value.
///
/// [`poll`]: core::future::Future::poll
/// [`Pending`]: core::task::poll::Poll::Pending
///
/// # Panics
///
/// If called with 0 as the number of iterations, this method panics.
///
pub fn yield_after_every(mut self, iterations: u32) -> Self {
self.yield_after = $crate::core_reexport::num::NonZeroU32::new(iterations)
.expect("iteration limit can't be 0");
self
}
};
}

macro_rules! try_future_method_yield_after_every {
() => {
future_method_yield_after_every! {
#[doc = "the underlying stream"]
#[doc = "the stream consecutively yields `Ok` items,"]
}
}
}

macro_rules! stream_method_yield_after_every {
() => {
stream_method_yield_after_every! {
#[doc = "the underlying stream"]
#[doc = "the stream consecutively yields items,"]
}
};
(#[$pollee:meta] #[$why_busy:meta]) => {
/// Changes the maximum number of iterations before `poll_next` yields.
///
/// The implementation of [`poll_next`] on this stream
/** polls */#[$pollee]/** in a loop. */
/// To prevent blocking in the call to `poll_next` for too long while
#[$why_busy]
/// the number of iterations is capped to an internal limit,
/// after which the `poll_next` function wakes up the task
/// and returns [`Pending`]. The `yield_after_every` combinator
/// can be used to tune the iteration limit, returning a stream
/// with the limit updated to the provided value.
///
/// [`poll_next`]: crate::stream::Stream::poll_next
/// [`Pending`]: core::task::poll::Poll::Pending
///
/// # Panics
///
/// If called with 0 as the number of iterations, this method panics.
///
pub fn yield_after_every(mut self, iterations: u32) -> Self {
self.yield_after = $crate::core_reexport::num::NonZeroU32::new(iterations)
.expect("iteration limit can't be 0");
self
}
};
}

0 comments on commit 5fa2ae7

Please sign in to comment.