From 4f24fe8fd47e1a142803bf878e383eaccfb285e5 Mon Sep 17 00:00:00 2001 From: kulst <60887784+kulst@users.noreply.github.com> Date: Wed, 13 Mar 2024 16:11:33 +0100 Subject: [PATCH] Added future::on_poll, future::on_poll_pending and future::on_poll_ready --- futures-util/src/future/mod.rs | 5 + futures-util/src/future/on_poll.rs | 279 +++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 futures-util/src/future/on_poll.rs diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 1280ce986..2e1ff989d 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -71,6 +71,11 @@ pub use self::poll_fn::{poll_fn, PollFn}; mod poll_immediate; pub use self::poll_immediate::{poll_immediate, PollImmediate}; +mod on_poll; +pub use self::on_poll::{on_poll, OnPoll}; +pub use self::on_poll::{on_poll_pending, OnPollPending}; +pub use self::on_poll::{on_poll_ready, OnPollReady}; + mod ready; pub use self::ready::{err, ok, ready, Ready}; diff --git a/futures-util/src/future/on_poll.rs b/futures-util/src/future/on_poll.rs new file mode 100644 index 000000000..fb5e9368d --- /dev/null +++ b/futures-util/src/future/on_poll.rs @@ -0,0 +1,279 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`on_poll`] function. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct OnPoll { + #[pin] + future: Option, + function: Option, + } +} + +/// Wrapps a future and a function in an [`OnPoll`]. On each [poll](core::future::Future::poll()) +/// the wrapped future is polled internally and the function is invoked. +/// +/// The function provides mutable access to the future's [`Context`] and to its [`Poll`]. +/// +/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPoll`] +/// will lead to a panic. +/// +/// **Warning:** You should only call the future's waker in this function when it is safe to +/// poll the future another time. Otherwise this might lead to a panic. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::on_poll; +/// use futures::pending; +/// use futures::task::{Context, Poll}; +/// let mut x = 0; +/// let future = async { +/// pending!(); // returns Poll::Pending, so x = 1 +/// pending!(); // returns Poll::Pending, so x = 2 +/// pending!(); // returns Poll::Pending, so x = 3 +/// "Some future" // returns Poll::Ready, so x = 4 +/// }; +/// futures::pin_mut!(future); +/// let funct = |cx : &mut Context<'_>, poll : &mut Poll<_>| { +/// x += 1; +/// if let Poll::Pending = poll { +/// cx.waker().wake_by_ref(); // only to let the future make progress +/// } +/// }; +/// let res = on_poll(future, funct).await; +/// assert_eq!(x, 4); +/// assert_eq!(res, "Some future"); +/// # }) +/// ``` +pub fn on_poll(future: T, function: F) -> OnPoll +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + assert_future::(OnPoll { future: Some(future), function: Some(function) }) +} + +impl Future for OnPoll +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let future = + this.future.as_mut().as_pin_mut().expect("cannot poll OnPoll after it was ready"); + + let mut poll = future.poll(cx); + if let Poll::Ready(_) = poll { + this.future.set(None); + unwrap_option(this.function.take())(cx, &mut poll); + return poll; + } + + unwrap_option(this.function.as_mut())(cx, &mut poll); + poll + } +} + +impl FusedFuture for OnPoll +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +pin_project! { + /// Future for the [`on_poll_pending`] function. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct OnPollPending { + #[pin] + future: Option, + function: Option, + } +} + +/// Wrapps a future and a function in an [`OnPollPending`]. On each [poll](core::future::Future::poll()) +/// the wrapped future is polled internally. The function is invoked each time the internal future returns +/// [`Poll::Pending`]. +/// +/// The function provides mutable access to the future's [`Context`] and to its [`Poll`]. +/// +/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPollPending`] +/// will lead to a panic. +/// +/// Can be used to abort a future after a specific number of pending polls by changing its [`Poll`] +/// to [`Poll::Ready`]. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::on_poll_pending; +/// use futures::pending; +/// use futures::task::{Context, Poll}; +/// let mut x = 0; +/// let future = async { +/// pending!(); // returns Poll::Pending ,so x = 1 +/// pending!(); // returns Poll::Pending, so x = 2 +/// pending!(); // returns Poll::Pending, so x = 3 +/// "Some future" // returns Poll::Ready, so x is unchanged +/// }; +/// futures::pin_mut!(future); +/// let funct = |cx : &mut Context<'_>, poll : &mut Poll<_>| { +/// *&mut x += 1; +/// cx.waker().wake_by_ref(); // only to let the future make progress +/// }; +/// let res = on_poll_pending(future, funct).await; +/// assert_eq!(x, 3); +/// assert_eq!(res, "Some future"); +/// # }) +/// ``` +pub fn on_poll_pending(future: T, function: F) -> OnPollPending +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + assert_future::(OnPollPending { future: Some(future), function: Some(function) }) +} + +impl Future for OnPollPending +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let future = this + .future + .as_mut() + .as_pin_mut() + .expect("cannot poll OnPollPending after it was ready"); + + let mut poll = future.poll(cx); + if let Poll::Ready(_) = poll { + this.future.set(None); + unwrap_option(this.function.take()); + return poll; + } + + unwrap_option(this.function.as_mut())(cx, &mut poll); + // Function was called so we need to check if the poll was manipulated to Poll::Ready + if let Poll::Ready(_) = poll { + this.future.set(None); + unwrap_option(this.function.take()); + } + poll + } +} + +impl FusedFuture for OnPollPending +where + T: Future, + F: FnMut(&mut Context<'_>, &mut Poll), +{ + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +pin_project! { + /// Future for the [`on_poll_ready`] function. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct OnPollReady { + #[pin] + future: Option, + function: Option, + } +} + +/// Wrapps a future and a function in an [`OnPollReady`]. On each [poll](core::future::Future::poll()) +/// the wrapped future is polled internally. The function is invoked when the internal future returns +/// [`Poll::Ready`]. +/// +/// The function provides mutable access to the future's [`Context`] and to its [`Output`](core::future::Future::Output). +/// +/// After the internal future has returned [`Poll::Ready`] additional polls to an [`OnPollReady`] +/// will lead to a panic. +/// +/// **Warning:** You should not call the waker in this function as this might lead to +/// another poll of the future. This will lead to a panic! +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::on_poll_ready; +/// let mut x = 0; +/// let future = async {"Some future"}; +/// futures::pin_mut!(future); +/// +/// let res = on_poll_ready(future, |_, _| *&mut x += 1).await; +/// assert_eq!(x, 1); +/// assert_eq!(res, "Some future"); +/// # }) +/// ``` +pub fn on_poll_ready(future: T, function: F) -> OnPollReady +where + T: Future, + F: FnOnce(&mut Context<'_>, &mut T::Output), +{ + assert_future::(OnPollReady { future: Some(future), function: Some(function) }) +} + +impl Future for OnPollReady +where + T: Future, + F: FnOnce(&mut Context<'_>, &mut T::Output), +{ + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let future = + this.future.as_mut().as_pin_mut().expect("cannot poll OnPollReady after it was ready"); + + if let Poll::Ready(mut val) = future.poll(cx) { + this.future.set(None); + unwrap_option(this.function.take())(cx, &mut val); + return Poll::Ready(val); + } + + Poll::Pending + } +} + +impl FusedFuture for OnPollReady +where + T: Future, + F: FnMut(&mut Context<'_>, &mut T::Output), +{ + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +/// When compiled with `-C opt-level=z`, this function will help the +/// compiler eliminate the `None` branch, where `Option::unwrap` does not. +#[inline(always)] +fn unwrap_option(value: Option) -> T { + match value { + None => unreachable!(), + Some(value) => value, + } +}