Skip to content

Commit

Permalink
Add FusedFuture implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and cramertj committed Sep 25, 2019
1 parent ad501de commit 5d17474
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 11 deletions.
2 changes: 1 addition & 1 deletion futures-test/src/future/pending_once.rs
Expand Up @@ -45,7 +45,7 @@ impl<Fut: Future> Future for PendingOnce<Fut> {
}
}

impl<Fut: Future + FusedFuture> FusedFuture for PendingOnce<Fut> {
impl<Fut: FusedFuture> FusedFuture for PendingOnce<Fut> {
fn is_terminated(&self) -> bool {
self.polled_before && self.future.is_terminated()
}
Expand Down
16 changes: 14 additions & 2 deletions futures-test/src/interleave_pending.rs
@@ -1,5 +1,5 @@
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::future::{Future, FusedFuture};
use futures_core::stream::{Stream, FusedStream};
use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
Expand Down Expand Up @@ -85,6 +85,12 @@ impl<Fut: Future> Future for InterleavePending<Fut> {
}
}

impl<Fut: FusedFuture> FusedFuture for InterleavePending<Fut> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<St: Stream> Stream for InterleavePending<St> {
type Item = St::Item;

Expand All @@ -110,6 +116,12 @@ impl<St: Stream> Stream for InterleavePending<St> {
}
}

impl<Fut: FusedStream> FusedStream for InterleavePending<Fut> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
fn poll_write(
self: Pin<&mut Self>,
Expand Down
8 changes: 7 additions & 1 deletion futures-util/src/future/into_stream.rs
@@ -1,7 +1,7 @@
use crate::stream::{self, Once};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

Expand Down Expand Up @@ -35,3 +35,9 @@ impl<Fut: Future> Stream for IntoStream<Fut> {
self.inner.size_hint()
}
}

impl<Fut: Future> FusedStream for IntoStream<Fut> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
10 changes: 9 additions & 1 deletion futures-util/src/future/join.rs
Expand Up @@ -3,7 +3,7 @@
use crate::future::{MaybeDone, maybe_done};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use super::assert_future;
Expand Down Expand Up @@ -62,6 +62,14 @@ macro_rules! generate {
}
}
}

impl<$($Fut: FusedFuture),*> FusedFuture for $Join<$($Fut),*> {
fn is_terminated(&self) -> bool {
$(
self.$Fut.is_terminated()
) && *
}
}
)*)
}

Expand Down
11 changes: 10 additions & 1 deletion futures-util/src/future/option.rs
@@ -1,7 +1,7 @@
//! Definition of the `Option` (optional step) combinator

use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

Expand Down Expand Up @@ -46,6 +46,15 @@ impl<F: Future> Future for OptionFuture<F> {
}
}

impl<F: FusedFuture> FusedFuture for OptionFuture<F> {
fn is_terminated(&self) -> bool {
match &self.option {
Some(x) => x.is_terminated(),
None => true,
}
}
}

impl<T> From<Option<T>> for OptionFuture<T> {
fn from(option: Option<T>) -> Self {
OptionFuture { option }
Expand Down
20 changes: 17 additions & 3 deletions futures-util/src/future/select.rs
@@ -1,12 +1,12 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use crate::future::{Either, FutureExt};

/// Future for the [`select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Select<A: Unpin, B: Unpin> {
pub struct Select<A, B> {
inner: Option<(A, B)>,
}

Expand Down Expand Up @@ -50,7 +50,11 @@ pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
Select { inner: Some((future1, future2)) }
}

impl<A: Unpin, B: Unpin> Future for Select<A, B> where A: Future, B: Future {
impl<A, B> Future for Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = Either<(A::Output, B), (B::Output, A)>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -67,3 +71,13 @@ impl<A: Unpin, B: Unpin> Future for Select<A, B> where A: Future, B: Future {
}
}
}

impl<A, B> FusedFuture for Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}
14 changes: 12 additions & 2 deletions futures-util/src/stream/concat.rs
@@ -1,6 +1,6 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::future::{Future, FusedFuture};
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};

Expand Down Expand Up @@ -57,3 +57,13 @@ where St: Stream,
}
}
}

impl<St> FusedFuture for Concat<St>
where St: FusedStream,
St::Item: Extend<<St::Item as IntoIterator>::Item> +
IntoIterator + Default,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.stream.is_terminated()
}
}

0 comments on commit 5d17474

Please sign in to comment.