Skip to content

Commit

Permalink
Add Stream::size_hint
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Sep 1, 2019
1 parent 90c83b8 commit 3415a86
Show file tree
Hide file tree
Showing 47 changed files with 425 additions and 34 deletions.
2 changes: 1 addition & 1 deletion futures-core/src/future/mod.rs
Expand Up @@ -57,7 +57,7 @@ mod private_try_future {

/// A convenience for futures that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryFuture: private_try_future::Sealed {
pub trait TryFuture: Future + private_try_future::Sealed {
/// The type of successful values yielded by this future
type Ok;

Expand Down
74 changes: 63 additions & 11 deletions futures-core/src/stream.rs
Expand Up @@ -59,6 +59,37 @@ pub trait Stream {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;

/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
Expand All @@ -70,6 +101,10 @@ impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
) -> Poll<Option<Self::Item>> {
S::poll_next(Pin::new(&mut **self), cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

impl<P> Stream for Pin<P>
Expand All @@ -85,6 +120,10 @@ where
) -> Poll<Option<Self::Item>> {
self.get_mut().as_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

/// A stream which tracks whether or not the underlying stream
Expand Down Expand Up @@ -126,7 +165,7 @@ mod private_try_stream {

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream: private_try_stream::Sealed {
pub trait TryStream: Stream + private_try_stream::Sealed {
/// The type of successful values yielded by this future
type Ok;

Expand Down Expand Up @@ -169,10 +208,30 @@ mod if_alloc {
) -> Poll<Option<Self::Item>> {
Pin::new(&mut **self).poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

impl<T: Unpin> Stream for alloc::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}

#[cfg(feature = "std")]
impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(
Expand All @@ -181,16 +240,9 @@ mod if_alloc {
) -> Poll<Option<S::Item>> {
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
}
}

impl<T: Unpin> Stream for ::alloc::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

Expand Down
5 changes: 5 additions & 0 deletions futures-executor/src/local_pool.rs
Expand Up @@ -329,9 +329,14 @@ impl<S: Stream + Unpin> BlockingStream<S> {

impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
type Item = S::Item;

fn next(&mut self) -> Option<Self::Item> {
LocalPool::new().run_until(self.stream.next())
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

impl Spawn for LocalSpawner {
Expand Down
4 changes: 4 additions & 0 deletions futures-test/src/interleave_pending.rs
Expand Up @@ -104,6 +104,10 @@ impl<St: Stream> Stream for InterleavePending<St> {
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
Expand Down
21 changes: 11 additions & 10 deletions futures-util/src/future/into_stream.rs
@@ -1,3 +1,4 @@
use crate::stream::{self, Once};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
Expand All @@ -8,29 +9,29 @@ use pin_utils::unsafe_pinned;
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<Fut: Future> {
future: Option<Fut>
inner: Once<Fut>
}

impl<Fut: Future> IntoStream<Fut> {
unsafe_pinned!(future: Option<Fut>);
unsafe_pinned!(inner: Once<Fut>);

pub(super) fn new(future: Fut) -> IntoStream<Fut> {
IntoStream {
future: Some(future)
inner: stream::once(future)
}
}
}

impl<Fut: Future> Stream for IntoStream<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner().poll_next(cx)
}

self.as_mut().future().set(None);
Poll::Ready(Some(v))
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
4 changes: 4 additions & 0 deletions futures-util/src/sink/buffer.rs
Expand Up @@ -76,6 +76,10 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/err_into.rs
Expand Up @@ -70,6 +70,10 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/map_err.rs
Expand Up @@ -95,6 +95,10 @@ impl<S: Stream, F> Stream for SinkMapErr<S, F> {
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/with.rs
Expand Up @@ -97,6 +97,10 @@ impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
Expand Down
5 changes: 5 additions & 0 deletions futures-util/src/sink/with_flat_map.rs
Expand Up @@ -121,12 +121,17 @@ where
St: Stream<Item = Result<Item, S::Error>>,
{
type Item = S::Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>
Expand Down
11 changes: 11 additions & 0 deletions futures-util/src/stream/buffer_unordered.rs
Expand Up @@ -127,6 +127,17 @@ where
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

impl<St> FusedStream for BufferUnordered<St>
Expand Down
11 changes: 11 additions & 0 deletions futures-util/src/stream/buffered.rs
Expand Up @@ -122,6 +122,17 @@ where
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down
8 changes: 8 additions & 0 deletions futures-util/src/stream/catch_unwind.rs
Expand Up @@ -45,6 +45,14 @@ impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.caught_unwind {
(0, Some(0))
} else {
self.stream.size_hint()
}
}
}

impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> {
Expand Down
18 changes: 18 additions & 0 deletions futures-util/src/stream/chain.rs
Expand Up @@ -54,4 +54,22 @@ where St1: Stream,
self.as_mut().first().set(None);
self.as_mut().second().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(first) = &self.first {
let (first_lower, first_upper) = first.size_hint();
let (second_lower, second_upper) = self.second.size_hint();

let lower = first_lower.saturating_add(second_lower);

let upper = match (first_upper, second_upper) {
(Some(x), Some(y)) => x.checked_add(y),
_ => None
};

(lower, upper)
} else {
self.second.size_hint()
}
}
}
11 changes: 11 additions & 0 deletions futures-util/src/stream/chunks.rs
Expand Up @@ -105,6 +105,17 @@ impl<St: Stream> Stream for Chunks<St> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
(lower, upper)
}
}

impl<St: FusedStream> FusedStream for Chunks<St> {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/stream/empty.rs
Expand Up @@ -33,4 +33,8 @@ impl<T> Stream for Empty<T> {
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
4 changes: 4 additions & 0 deletions futures-util/src/stream/enumerate.rs
Expand Up @@ -81,6 +81,10 @@ impl<St: Stream> Stream for Enumerate<St> {
None => Poll::Ready(None),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down
10 changes: 10 additions & 0 deletions futures-util/src/stream/filter.rs
Expand Up @@ -132,6 +132,16 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
None => None,
};
(0, upper) // can't know a lower bound, due to the predicate
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down

0 comments on commit 3415a86

Please sign in to comment.