From ae422d8177d99154d23d0f4ef4982cf6d54a5c94 Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Thu, 29 Jul 2021 10:34:23 -0600 Subject: [PATCH] New poll_immediate functions to immediately return from a poll (#2452) --- futures-util/src/future/mod.rs | 3 + futures-util/src/future/poll_immediate.rs | 126 ++++++++++++++++++++++ futures-util/src/stream/mod.rs | 3 + futures-util/src/stream/poll_immediate.rs | 80 ++++++++++++++ 4 files changed, 212 insertions(+) create mode 100644 futures-util/src/future/poll_immediate.rs create mode 100644 futures-util/src/stream/poll_immediate.rs diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index cd082642b7..374e36512f 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -68,6 +68,9 @@ pub use self::option::OptionFuture; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod ready; pub use self::ready::{err, ok, ready, Ready}; diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs new file mode 100644 index 0000000000..5ae555c73e --- /dev/null +++ b/futures-util/src/future/poll_immediate.rs @@ -0,0 +1,126 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + future: Option + } +} + +impl Future for PollImmediate +where + F: Future, +{ + type Output = Option; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + let inner = + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); + match inner.poll(cx) { + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } + Poll::Pending => Poll::Ready(None), + } + } +} + +impl FusedFuture for PollImmediate { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done. +/// The stream will never return [Poll::Pending](core::task::Poll::Pending) +/// so polling it in a tight loop is worse than using a blocking synchronous function. +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future, pin_mut}; +/// use future::FusedFuture; +/// +/// let f = async { 1_u32 }; +/// pin_mut!(f); +/// let mut r = future::poll_immediate(f); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// let mut p = future::poll_immediate(f); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl Stream for PollImmediate +where + F: Future, +{ + type Item = Poll; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); + t + }))), + } + } +} + +/// Creates a future that is immediately ready with an Option of a value. +/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready). +/// +/// # Caution +/// +/// When consuming the future by this function, note the following: +/// +/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value. +/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate(async { 1_u32 }); +/// assert_eq!(r.await, Some(1)); +/// +/// let p = future::poll_immediate(future::pending::()); +/// assert_eq!(p.await, None); +/// # }); +/// ``` +/// +/// ### Reusing a future +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{future, pin_mut}; +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// assert_eq!(None, future::poll_immediate(&mut f).await); +/// assert_eq!(42, f.await); +/// # }); +/// ``` +pub fn poll_immediate(f: F) -> PollImmediate { + assert_future::, PollImmediate>(PollImmediate { future: Some(f) }) +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 8cf9f80a9d..a3740325f8 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -88,6 +88,9 @@ pub use self::pending::{pending, Pending}; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod select; pub use self::select::{select, Select}; diff --git a/futures-util/src/stream/poll_immediate.rs b/futures-util/src/stream/poll_immediate.rs new file mode 100644 index 0000000000..c7e8a5b3c6 --- /dev/null +++ b/futures-util/src/stream/poll_immediate.rs @@ -0,0 +1,80 @@ +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::Stream; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [poll_immediate](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + stream: Option + } +} + +impl Stream for PollImmediate +where + S: Stream, +{ + type Item = Poll; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + let stream = match this.stream.as_mut().as_pin_mut() { + // inner is gone, so we can continue to signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + + match stream.poll_next(cx) { + Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), + Poll::Ready(None) => { + this.stream.set(None); + Poll::Ready(None) + } + Poll::Pending => Poll::Ready(Some(Poll::Pending)), + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint) + } +} + +impl super::FusedStream for PollImmediate { + fn is_terminated(&self) -> bool { + self.stream.is_none() + } +} + +/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it. +/// +/// This is useful when immediacy is more important than waiting for the next item to be ready. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// use futures::task::Poll; +/// +/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3))); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// assert_eq!(r.next().await, Some(Poll::Ready(2))); +/// assert_eq!(r.next().await, None); +/// +/// let mut p = stream::poll_immediate(Box::pin(stream::once(async { +/// futures::pending!(); +/// 42_u8 +/// }))); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +pub fn poll_immediate(s: S) -> PollImmediate { + super::assert_stream::, PollImmediate>(PollImmediate { stream: Some(s) }) +}