Skip to content

Commit

Permalink
Added future::on_poll, future::on_poll_pending and future::on_poll_ready
Browse files Browse the repository at this point in the history
  • Loading branch information
kulst committed Mar 16, 2024
1 parent 570c4e7 commit 4f24fe8
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 0 deletions.
5 changes: 5 additions & 0 deletions futures-util/src/future/mod.rs
Expand Up @@ -71,6 +71,11 @@ pub use self::poll_fn::{poll_fn, PollFn};
mod poll_immediate;
pub use self::poll_immediate::{poll_immediate, PollImmediate};

mod on_poll;
pub use self::on_poll::{on_poll, OnPoll};
pub use self::on_poll::{on_poll_pending, OnPollPending};
pub use self::on_poll::{on_poll_ready, OnPollReady};

mod ready;
pub use self::ready::{err, ok, ready, Ready};

Expand Down
279 changes: 279 additions & 0 deletions futures-util/src/future/on_poll.rs
@@ -0,0 +1,279 @@
use super::assert_future;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`on_poll`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnPoll<T, F> {
#[pin]
future: Option<T>,
function: Option<F>,
}
}

/// Wrapps a future and a function in an [`OnPoll`]. On each [poll](core::future::Future::poll())
/// the wrapped future is polled internally and the function is invoked.
///
/// The function provides mutable access to the future's [`Context`] and to its [`Poll`].
///
/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPoll`]
/// will lead to a panic.
///
/// **Warning:** You should only call the future's waker in this function when it is safe to
/// poll the future another time. Otherwise this might lead to a panic.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::on_poll;
/// use futures::pending;
/// use futures::task::{Context, Poll};
/// let mut x = 0;
/// let future = async {
/// pending!(); // returns Poll::Pending, so x = 1
/// pending!(); // returns Poll::Pending, so x = 2
/// pending!(); // returns Poll::Pending, so x = 3
/// "Some future" // returns Poll::Ready, so x = 4
/// };
/// futures::pin_mut!(future);
/// let funct = |cx : &mut Context<'_>, poll : &mut Poll<_>| {
/// x += 1;
/// if let Poll::Pending = poll {
/// cx.waker().wake_by_ref(); // only to let the future make progress
/// }
/// };
/// let res = on_poll(future, funct).await;
/// assert_eq!(x, 4);
/// assert_eq!(res, "Some future");
/// # })
/// ```
pub fn on_poll<T, F>(future: T, function: F) -> OnPoll<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
assert_future::<T::Output, _>(OnPoll { future: Some(future), function: Some(function) })
}

impl<T, F> Future for OnPoll<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let future =
this.future.as_mut().as_pin_mut().expect("cannot poll OnPoll after it was ready");

let mut poll = future.poll(cx);
if let Poll::Ready(_) = poll {
this.future.set(None);
unwrap_option(this.function.take())(cx, &mut poll);
return poll;
}

unwrap_option(this.function.as_mut())(cx, &mut poll);
poll
}
}

impl<T, F> FusedFuture for OnPoll<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}

pin_project! {
/// Future for the [`on_poll_pending`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnPollPending<T, F> {
#[pin]
future: Option<T>,
function: Option<F>,
}
}

/// Wrapps a future and a function in an [`OnPollPending`]. On each [poll](core::future::Future::poll())
/// the wrapped future is polled internally. The function is invoked each time the internal future returns
/// [`Poll::Pending`].
///
/// The function provides mutable access to the future's [`Context`] and to its [`Poll`].
///
/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPollPending`]
/// will lead to a panic.
///
/// Can be used to abort a future after a specific number of pending polls by changing its [`Poll`]
/// to [`Poll::Ready`].
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::on_poll_pending;
/// use futures::pending;
/// use futures::task::{Context, Poll};
/// let mut x = 0;
/// let future = async {
/// pending!(); // returns Poll::Pending ,so x = 1
/// pending!(); // returns Poll::Pending, so x = 2
/// pending!(); // returns Poll::Pending, so x = 3
/// "Some future" // returns Poll::Ready, so x is unchanged
/// };
/// futures::pin_mut!(future);
/// let funct = |cx : &mut Context<'_>, poll : &mut Poll<_>| {
/// *&mut x += 1;
/// cx.waker().wake_by_ref(); // only to let the future make progress
/// };
/// let res = on_poll_pending(future, funct).await;
/// assert_eq!(x, 3);
/// assert_eq!(res, "Some future");
/// # })
/// ```
pub fn on_poll_pending<T, F>(future: T, function: F) -> OnPollPending<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
assert_future::<T::Output, _>(OnPollPending { future: Some(future), function: Some(function) })
}

impl<T, F> Future for OnPollPending<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let future = this
.future
.as_mut()
.as_pin_mut()
.expect("cannot poll OnPollPending after it was ready");

let mut poll = future.poll(cx);
if let Poll::Ready(_) = poll {
this.future.set(None);
unwrap_option(this.function.take());
return poll;
}

unwrap_option(this.function.as_mut())(cx, &mut poll);
// Function was called so we need to check if the poll was manipulated to Poll::Ready
if let Poll::Ready(_) = poll {
this.future.set(None);
unwrap_option(this.function.take());
}
poll
}
}

impl<T, F> FusedFuture for OnPollPending<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut Poll<T::Output>),
{
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}

pin_project! {
/// Future for the [`on_poll_ready`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnPollReady<T, F> {
#[pin]
future: Option<T>,
function: Option<F>,
}
}

/// Wrapps a future and a function in an [`OnPollReady`]. On each [poll](core::future::Future::poll())
/// the wrapped future is polled internally. The function is invoked when the internal future returns
/// [`Poll::Ready`].
///
/// The function provides mutable access to the future's [`Context`] and to its [`Output`](core::future::Future::Output).
///
/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPollReady`]
/// will lead to a panic.
///
/// **Warning:** You should not call the waker in this function as this might lead to
/// another poll of the future. This will lead to a panic!
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::on_poll_ready;
/// let mut x = 0;
/// let future = async {"Some future"};
/// futures::pin_mut!(future);
///
/// let res = on_poll_ready(future, |_, _| *&mut x += 1).await;
/// assert_eq!(x, 1);
/// assert_eq!(res, "Some future");
/// # })
/// ```
pub fn on_poll_ready<T, F>(future: T, function: F) -> OnPollReady<T, F>
where
T: Future,
F: FnOnce(&mut Context<'_>, &mut T::Output),
{
assert_future::<T::Output, _>(OnPollReady { future: Some(future), function: Some(function) })
}

impl<T, F> Future for OnPollReady<T, F>
where
T: Future,
F: FnOnce(&mut Context<'_>, &mut T::Output),
{
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let future =
this.future.as_mut().as_pin_mut().expect("cannot poll OnPollReady after it was ready");

if let Poll::Ready(mut val) = future.poll(cx) {
this.future.set(None);
unwrap_option(this.function.take())(cx, &mut val);
return Poll::Ready(val);
}

Poll::Pending
}
}

impl<T, F> FusedFuture for OnPollReady<T, F>
where
T: Future,
F: FnMut(&mut Context<'_>, &mut T::Output),
{
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}

/// When compiled with `-C opt-level=z`, this function will help the
/// compiler eliminate the `None` branch, where `Option::unwrap` does not.
#[inline(always)]
fn unwrap_option<T>(value: Option<T>) -> T {
match value {
None => unreachable!(),
Some(value) => value,
}
}

0 comments on commit 4f24fe8

Please sign in to comment.