Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream::size_hint #1853

Merged
merged 1 commit into from Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-core/src/future/mod.rs
Expand Up @@ -57,7 +57,7 @@ mod private_try_future {

/// A convenience for futures that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryFuture: private_try_future::Sealed {
pub trait TryFuture: Future + private_try_future::Sealed {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for consistency with TryStream.

/// The type of successful values yielded by this future
type Ok;

Expand Down
74 changes: 63 additions & 11 deletions futures-core/src/stream.rs
Expand Up @@ -59,6 +59,37 @@ pub trait Stream {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;

/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
Expand All @@ -70,6 +101,10 @@ impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
) -> Poll<Option<Self::Item>> {
S::poll_next(Pin::new(&mut **self), cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

impl<P> Stream for Pin<P>
Expand All @@ -85,6 +120,10 @@ where
) -> Poll<Option<Self::Item>> {
self.get_mut().as_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

/// A stream which tracks whether or not the underlying stream
Expand Down Expand Up @@ -126,7 +165,7 @@ mod private_try_stream {

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream: private_try_stream::Sealed {
pub trait TryStream: Stream + private_try_stream::Sealed {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this bound because it would be convenient if stream.size_hint() could be called on St: TryStream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh...

error[E0191]: the value of the associated type `Item` (from the trait `futures_core::stream::Stream`) must be specified
  --> futures/tests/object_safety.rs:20:30
   |
20 |     assert_is_object_safe::<&dyn TryStream<Ok = (), Error = ()>>();
   |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ associated type `Item` must be specified

/// The type of successful values yielded by this future
type Ok;

Expand Down Expand Up @@ -169,10 +208,30 @@ mod if_alloc {
) -> Poll<Option<Self::Item>> {
Pin::new(&mut **self).poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

impl<T: Unpin> Stream for alloc::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}

#[cfg(feature = "std")]
impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(
Expand All @@ -181,16 +240,9 @@ mod if_alloc {
) -> Poll<Option<S::Item>> {
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
}
}

impl<T: Unpin> Stream for ::alloc::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

Expand Down
5 changes: 5 additions & 0 deletions futures-executor/src/local_pool.rs
Expand Up @@ -329,9 +329,14 @@ impl<S: Stream + Unpin> BlockingStream<S> {

impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
type Item = S::Item;

fn next(&mut self) -> Option<Self::Item> {
LocalPool::new().run_until(self.stream.next())
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

impl Spawn for LocalSpawner {
Expand Down
4 changes: 4 additions & 0 deletions futures-test/src/interleave_pending.rs
Expand Up @@ -104,6 +104,10 @@ impl<St: Stream> Stream for InterleavePending<St> {
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

impl<W: AsyncWrite> AsyncWrite for InterleavePending<W> {
Expand Down
21 changes: 11 additions & 10 deletions futures-util/src/future/into_stream.rs
@@ -1,3 +1,4 @@
use crate::stream::{self, Once};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
Expand All @@ -8,29 +9,29 @@ use pin_utils::unsafe_pinned;
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<Fut: Future> {
future: Option<Fut>
inner: Once<Fut>
}

impl<Fut: Future> IntoStream<Fut> {
unsafe_pinned!(future: Option<Fut>);
unsafe_pinned!(inner: Once<Fut>);

pub(super) fn new(future: Fut) -> IntoStream<Fut> {
IntoStream {
future: Some(future)
inner: stream::once(future)
}
}
}

impl<Fut: Future> Stream for IntoStream<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner().poll_next(cx)
}

self.as_mut().future().set(None);
Poll::Ready(Some(v))
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
4 changes: 4 additions & 0 deletions futures-util/src/sink/buffer.rs
Expand Up @@ -76,6 +76,10 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/err_into.rs
Expand Up @@ -70,6 +70,10 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/map_err.rs
Expand Up @@ -95,6 +95,10 @@ impl<S: Stream, F> Stream for SinkMapErr<S, F> {
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/sink/with.rs
Expand Up @@ -97,6 +97,10 @@ impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
Expand Down
5 changes: 5 additions & 0 deletions futures-util/src/sink/with_flat_map.rs
Expand Up @@ -121,12 +121,17 @@ where
St: Stream<Item = Result<Item, S::Error>>,
{
type Item = S::Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<S::Item>> {
self.sink().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.sink.size_hint()
}
}

impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>
Expand Down
11 changes: 11 additions & 0 deletions futures-util/src/stream/buffer_unordered.rs
Expand Up @@ -127,6 +127,17 @@ where
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

impl<St> FusedStream for BufferUnordered<St>
Expand Down
11 changes: 11 additions & 0 deletions futures-util/src/stream/buffered.rs
Expand Up @@ -122,6 +122,17 @@ where
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down
8 changes: 8 additions & 0 deletions futures-util/src/stream/catch_unwind.rs
Expand Up @@ -45,6 +45,14 @@ impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.caught_unwind {
(0, Some(0))
} else {
self.stream.size_hint()
}
}
}

impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> {
Expand Down
18 changes: 18 additions & 0 deletions futures-util/src/stream/chain.rs
Expand Up @@ -54,4 +54,22 @@ where St1: Stream,
self.as_mut().first().set(None);
self.as_mut().second().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(first) = &self.first {
let (first_lower, first_upper) = first.size_hint();
let (second_lower, second_upper) = self.second.size_hint();

let lower = first_lower.saturating_add(second_lower);

let upper = match (first_upper, second_upper) {
(Some(x), Some(y)) => x.checked_add(y),
_ => None
};

(lower, upper)
} else {
self.second.size_hint()
}
}
}
11 changes: 11 additions & 0 deletions futures-util/src/stream/chunks.rs
Expand Up @@ -105,6 +105,17 @@ impl<St: Stream> Stream for Chunks<St> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
(lower, upper)
}
}

impl<St: FusedStream> FusedStream for Chunks<St> {
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/stream/empty.rs
Expand Up @@ -33,4 +33,8 @@ impl<T> Stream for Empty<T> {
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
4 changes: 4 additions & 0 deletions futures-util/src/stream/enumerate.rs
Expand Up @@ -81,6 +81,10 @@ impl<St: Stream> Stream for Enumerate<St> {
None => Poll::Ready(None),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down
10 changes: 10 additions & 0 deletions futures-util/src/stream/filter.rs
Expand Up @@ -132,6 +132,16 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
None => None,
};
(0, upper) // can't know a lower bound, due to the predicate
}
}

// Forwarding impl of Sink from the underlying stream
Expand Down