diff --git a/futures-test/src/future/pending_once.rs b/futures-test/src/future/pending_once.rs index d9932a1fd7..ae07b2fab7 100644 --- a/futures-test/src/future/pending_once.rs +++ b/futures-test/src/future/pending_once.rs @@ -45,7 +45,7 @@ impl Future for PendingOnce { } } -impl FusedFuture for PendingOnce { +impl FusedFuture for PendingOnce { fn is_terminated(&self) -> bool { self.polled_before && self.future.is_terminated() } diff --git a/futures-test/src/interleave_pending.rs b/futures-test/src/interleave_pending.rs index 993bee562e..35c599dcdb 100644 --- a/futures-test/src/interleave_pending.rs +++ b/futures-test/src/interleave_pending.rs @@ -1,5 +1,5 @@ -use futures_core::future::Future; -use futures_core::stream::Stream; +use futures_core::future::{Future, FusedFuture}; +use futures_core::stream::{Stream, FusedStream}; use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ @@ -85,6 +85,12 @@ impl Future for InterleavePending { } } +impl FusedFuture for InterleavePending { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + impl Stream for InterleavePending { type Item = St::Item; @@ -110,6 +116,12 @@ impl Stream for InterleavePending { } } +impl FusedStream for InterleavePending { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + impl AsyncWrite for InterleavePending { fn poll_write( self: Pin<&mut Self>, diff --git a/futures-util/src/future/abortable.rs b/futures-util/src/future/abortable.rs index 281cf6b481..632337a34c 100644 --- a/futures-util/src/future/abortable.rs +++ b/futures-util/src/future/abortable.rs @@ -1,5 +1,5 @@ use crate::task::AtomicWaker; -use futures_core::future::Future; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; use pin_utils::unsafe_pinned; use core::fmt; @@ -45,6 +45,10 @@ impl Abortable where Fut: Future { inner: reg.inner, } } + + fn is_aborted(&self) -> bool { + self.inner.cancel.load(Ordering::Relaxed) + } } /// A registration handle for a `Abortable` future. @@ -139,7 +143,7 @@ impl Future for Abortable where Fut: Future { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Check if the future has been aborted - if self.inner.cancel.load(Ordering::Relaxed) { + if self.is_aborted() { return Poll::Ready(Err(Aborted)) } @@ -155,7 +159,7 @@ impl Future for Abortable where Fut: Future { // registration. // Checking with `Relaxed` is sufficient because `register` introduces an // `AcqRel` barrier. - if self.inner.cancel.load(Ordering::Relaxed) { + if self.is_aborted() { return Poll::Ready(Err(Aborted)) } @@ -163,6 +167,12 @@ impl Future for Abortable where Fut: Future { } } +impl FusedFuture for Abortable where Fut: FusedFuture { + fn is_terminated(&self) -> bool { + self.is_aborted() || self.future.is_terminated() + } +} + impl AbortHandle { /// Abort the `Abortable` future associated with this handle. /// diff --git a/futures-util/src/future/into_stream.rs b/futures-util/src/future/into_stream.rs index 9f8a391280..8cefda04f3 100644 --- a/futures-util/src/future/into_stream.rs +++ b/futures-util/src/future/into_stream.rs @@ -1,7 +1,7 @@ use crate::stream::{self, Once}; use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use pin_utils::unsafe_pinned; @@ -35,3 +35,9 @@ impl Stream for IntoStream { self.inner.size_hint() } } + +impl FusedStream for IntoStream { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} diff --git a/futures-util/src/future/join.rs b/futures-util/src/future/join.rs index 5e984b8439..5af5b408e9 100644 --- a/futures-util/src/future/join.rs +++ b/futures-util/src/future/join.rs @@ -3,7 +3,7 @@ use crate::future::{MaybeDone, maybe_done}; use core::fmt; use core::pin::Pin; -use futures_core::future::Future; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; use pin_utils::unsafe_pinned; use super::assert_future; @@ -62,6 +62,14 @@ macro_rules! generate { } } } + + impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> { + fn is_terminated(&self) -> bool { + $( + self.$Fut.is_terminated() + ) && * + } + } )*) } diff --git a/futures-util/src/future/option.rs b/futures-util/src/future/option.rs index e14745e9d4..21413525d0 100644 --- a/futures-util/src/future/option.rs +++ b/futures-util/src/future/option.rs @@ -1,7 +1,7 @@ //! Definition of the `Option` (optional step) combinator use core::pin::Pin; -use futures_core::future::Future; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; use pin_utils::unsafe_pinned; @@ -46,6 +46,15 @@ impl Future for OptionFuture { } } +impl FusedFuture for OptionFuture { + fn is_terminated(&self) -> bool { + match &self.option { + Some(x) => x.is_terminated(), + None => true, + } + } +} + impl From> for OptionFuture { fn from(option: Option) -> Self { OptionFuture { option } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 8f6c1030f2..91b467dca6 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -1,12 +1,12 @@ use core::pin::Pin; -use futures_core::future::Future; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; use crate::future::{Either, FutureExt}; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] -pub struct Select { +pub struct Select { inner: Option<(A, B)>, } @@ -50,7 +50,11 @@ pub fn select(future1: A, future2: B) -> Select Select { inner: Some((future1, future2)) } } -impl Future for Select where A: Future, B: Future { +impl Future for Select +where + A: Future + Unpin, + B: Future + Unpin, +{ type Output = Either<(A::Output, B), (B::Output, A)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -67,3 +71,13 @@ impl Future for Select where A: Future, B: Future { } } } + +impl FusedFuture for Select +where + A: Future + Unpin, + B: Future + Unpin, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} diff --git a/futures-util/src/stream/concat.rs b/futures-util/src/stream/concat.rs index 0aa04dbb23..704efc79fd 100644 --- a/futures-util/src/stream/concat.rs +++ b/futures-util/src/stream/concat.rs @@ -1,6 +1,6 @@ use core::pin::Pin; -use futures_core::future::Future; -use futures_core::stream::Stream; +use futures_core::future::{Future, FusedFuture}; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; @@ -57,3 +57,13 @@ where St: Stream, } } } + +impl FusedFuture for Concat +where St: FusedStream, + St::Item: Extend<::Item> + + IntoIterator + Default, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.stream.is_terminated() + } +}