Skip to content

Commit

Permalink
Implement TryStreamExt::try_take_while (#2212)
Browse files Browse the repository at this point in the history
Co-authored-by: Anders Qvist <quest@pilotfish.se>
  • Loading branch information
bittrance and Anders Qvist committed Sep 20, 2020
1 parent cc01570 commit 263dbc6
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 2 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -45,7 +45,7 @@ mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
TrySkipWhile, TryStreamExt, TryUnfold,
TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
};

#[cfg(feature = "io")]
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/stream/try_stream/mod.rs
Expand Up @@ -103,6 +103,10 @@ mod try_skip_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_skip_while::TrySkipWhile;

mod try_take_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_take_while::TryTakeWhile;

cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
mod try_buffer_unordered;
Expand Down Expand Up @@ -432,6 +436,36 @@ pub trait TryStreamExt: TryStream {
TrySkipWhile::new(self, f)
}

/// Take elements on this stream while the provided asynchronous predicate
/// resolves to `true`.
///
/// This function is similar to
/// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
/// early if an error occurs.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, TryStreamExt};
///
/// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
/// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
///
/// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
/// assert_eq!(output, Ok(vec![1, 2]));
/// # })
/// ```
fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
{
TryTakeWhile::new(self, f)
}

/// Attempts to run this stream to completion, executing the provided asynchronous
/// closure for each element on the stream concurrently as elements become
/// available, exiting as soon as an error occurs.
Expand Down
132 changes: 132 additions & 0 deletions futures-util/src/stream/try_stream/try_take_while.rs
@@ -0,0 +1,132 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project::pin_project;

/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
/// method.
#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryTakeWhile<St, Fut, F>
where
St: TryStream,
{
#[pin]
stream: St,
f: F,
#[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
done_taking: bool,
}

impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
where
St: TryStream + fmt::Debug,
St::Ok: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryTakeWhile")
.field("stream", &self.stream)
.field("pending_fut", &self.pending_fut)
.field("pending_item", &self.pending_item)
.field("done_taking", &self.done_taking)
.finish()
}
}

impl<St, Fut, F> TryTakeWhile<St, Fut, F>
where
St: TryStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> {
TryTakeWhile {
stream,
f,
pending_fut: None,
pending_item: None,
done_taking: false,
}
}

delegate_access_inner!(stream, St, ());
}

impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
where
St: TryStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
type Item = Result<St::Ok, St::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if *this.done_taking {
return Poll::Ready(None);
}

Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let take = ready!(fut.try_poll(cx)?);
let item = this.pending_item.take();
this.pending_fut.set(None);
if take {
break item.map(Ok);
} else {
*this.done_taking = true;
break None;
}
} else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
this.pending_fut.set(Some((this.f)(&item)));
*this.pending_item = Some(item);
} else {
break None;
}
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.done_taking {
return (0, Some(0));
}

let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
None => None,
};
(0, upper) // can't know a lower bound, due to the predicate
}
}

impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
where
St: TryStream + FusedStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
fn is_terminated(&self) -> bool {
self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
where
S: TryStream + Sink<Item, Error = E>,
{
type Error = E;

delegate_sink!(stream, Item);
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -465,7 +465,7 @@ pub mod stream {
AndThen, ErrInto, MapOk, MapErr, OrElse,
InspectOk, InspectErr,
TryNext, TryForEach, TryFilter, TryFilterMap, TryFlatten,
TryCollect, TryConcat, TryFold, TrySkipWhile,
TryCollect, TryConcat, TryFold, TrySkipWhile, TryTakeWhile,
IntoStream,
};

Expand Down

0 comments on commit 263dbc6

Please sign in to comment.