From 3415a86fe917c2e6931bfbd1930f92917c594b2d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 1 Sep 2019 22:01:06 +0900 Subject: [PATCH] Add Stream::size_hint --- futures-core/src/future/mod.rs | 2 +- futures-core/src/stream.rs | 74 ++++++++++++++++--- futures-executor/src/local_pool.rs | 5 ++ futures-test/src/interleave_pending.rs | 4 + futures-util/src/future/into_stream.rs | 21 +++--- futures-util/src/sink/buffer.rs | 4 + futures-util/src/sink/err_into.rs | 4 + futures-util/src/sink/map_err.rs | 4 + futures-util/src/sink/with.rs | 4 + futures-util/src/sink/with_flat_map.rs | 5 ++ futures-util/src/stream/buffer_unordered.rs | 11 +++ futures-util/src/stream/buffered.rs | 11 +++ futures-util/src/stream/catch_unwind.rs | 8 ++ futures-util/src/stream/chain.rs | 18 +++++ futures-util/src/stream/chunks.rs | 11 +++ futures-util/src/stream/empty.rs | 4 + futures-util/src/stream/enumerate.rs | 4 + futures-util/src/stream/filter.rs | 10 +++ futures-util/src/stream/filter_map.rs | 10 +++ futures-util/src/stream/fuse.rs | 8 ++ futures-util/src/stream/futures_ordered.rs | 5 ++ .../src/stream/futures_unordered/mod.rs | 5 ++ futures-util/src/stream/inspect.rs | 4 + futures-util/src/stream/iter.rs | 4 + futures-util/src/stream/map.rs | 4 + futures-util/src/stream/once.rs | 25 ++++--- futures-util/src/stream/peek.rs | 11 +++ futures-util/src/stream/pending.rs | 4 + futures-util/src/stream/repeat.rs | 4 + futures-util/src/stream/skip.rs | 12 +++ futures-util/src/stream/skip_while.rs | 10 +++ futures-util/src/stream/take.rs | 18 +++++ futures-util/src/stream/take_while.rs | 14 ++++ futures-util/src/stream/then.rs | 11 +++ futures-util/src/stream/zip.rs | 26 +++++++ futures-util/src/try_stream/and_then.rs | 11 +++ futures-util/src/try_stream/err_into.rs | 4 + futures-util/src/try_stream/inspect_err.rs | 4 + futures-util/src/try_stream/inspect_ok.rs | 4 + futures-util/src/try_stream/into_stream.rs | 4 + futures-util/src/try_stream/map_err.rs | 4 + futures-util/src/try_stream/map_ok.rs | 4 + futures-util/src/try_stream/or_else.rs | 11 +++ futures-util/src/try_stream/try_filter.rs | 10 +++ futures-util/src/try_stream/try_filter_map.rs | 10 +++ futures-util/src/try_stream/try_skip_while.rs | 10 +++ futures/tests/object_safety.rs | 4 +- 47 files changed, 425 insertions(+), 34 deletions(-) diff --git a/futures-core/src/future/mod.rs b/futures-core/src/future/mod.rs index c627311996..ba9398eef6 100644 --- a/futures-core/src/future/mod.rs +++ b/futures-core/src/future/mod.rs @@ -57,7 +57,7 @@ mod private_try_future { /// A convenience for futures that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryFuture: private_try_future::Sealed { +pub trait TryFuture: Future + private_try_future::Sealed { /// The type of successful values yielded by this future type Ok; diff --git a/futures-core/src/stream.rs b/futures-core/src/stream.rs index 12bb01f300..37c60c9c6b 100644 --- a/futures-core/src/stream.rs +++ b/futures-core/src/stream.rs @@ -59,6 +59,37 @@ pub trait Stream { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>; + + /// Returns the bounds on the remaining length of the stream. + /// + /// Specifically, `size_hint()` returns a tuple where the first element + /// is the lower bound, and the second element is the upper bound. + /// + /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. + /// A [`None`] here means that either there is no known upper bound, or the + /// upper bound is larger than [`usize`]. + /// + /// # Implementation notes + /// + /// It is not enforced that a stream implementation yields the declared + /// number of elements. A buggy stream may yield less than the lower bound + /// or more than the upper bound of elements. + /// + /// `size_hint()` is primarily intended to be used for optimizations such as + /// reserving space for the elements of the stream, but must not be + /// trusted to e.g., omit bounds checks in unsafe code. An incorrect + /// implementation of `size_hint()` should not lead to memory safety + /// violations. + /// + /// That said, the implementation should provide a correct estimation, + /// because otherwise it would be a violation of the trait's protocol. + /// + /// The default implementation returns `(0, `[`None`]`)` which is correct for any + /// stream. + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } } impl Stream for &mut S { @@ -70,6 +101,10 @@ impl Stream for &mut S { ) -> Poll> { S::poll_next(Pin::new(&mut **self), cx) } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } } impl

Stream for Pin

@@ -85,6 +120,10 @@ where ) -> Poll> { self.get_mut().as_mut().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } } /// A stream which tracks whether or not the underlying stream @@ -126,7 +165,7 @@ mod private_try_stream { /// A convenience for streams that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryStream: private_try_stream::Sealed { +pub trait TryStream: Stream + private_try_stream::Sealed { /// The type of successful values yielded by this future type Ok; @@ -169,10 +208,30 @@ mod if_alloc { ) -> Poll> { Pin::new(&mut **self).poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } + } + + impl Stream for alloc::collections::VecDeque { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(self.pop_front()) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } } #[cfg(feature = "std")] - impl Stream for ::std::panic::AssertUnwindSafe { + impl Stream for std::panic::AssertUnwindSafe { type Item = S::Item; fn poll_next( @@ -181,16 +240,9 @@ mod if_alloc { ) -> Poll> { unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) } - } - - impl Stream for ::alloc::collections::VecDeque { - type Item = T; - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(self.pop_front()) + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() } } diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 78ba2c3493..9985dca5b9 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -329,9 +329,14 @@ impl BlockingStream { impl Iterator for BlockingStream { type Item = S::Item; + fn next(&mut self) -> Option { LocalPool::new().run_until(self.stream.next()) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } impl Spawn for LocalSpawner { diff --git a/futures-test/src/interleave_pending.rs b/futures-test/src/interleave_pending.rs index 302075a646..993bee562e 100644 --- a/futures-test/src/interleave_pending.rs +++ b/futures-test/src/interleave_pending.rs @@ -104,6 +104,10 @@ impl Stream for InterleavePending { Poll::Pending } } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } } impl AsyncWrite for InterleavePending { diff --git a/futures-util/src/future/into_stream.rs b/futures-util/src/future/into_stream.rs index adc987e6a4..9f8a391280 100644 --- a/futures-util/src/future/into_stream.rs +++ b/futures-util/src/future/into_stream.rs @@ -1,3 +1,4 @@ +use crate::stream::{self, Once}; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::Stream; @@ -8,15 +9,15 @@ use pin_utils::unsafe_pinned; #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct IntoStream { - future: Option + inner: Once } impl IntoStream { - unsafe_pinned!(future: Option); + unsafe_pinned!(inner: Once); pub(super) fn new(future: Fut) -> IntoStream { IntoStream { - future: Some(future) + inner: stream::once(future) } } } @@ -24,13 +25,13 @@ impl IntoStream { impl Stream for IntoStream { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let v = match self.as_mut().future().as_pin_mut() { - Some(fut) => ready!(fut.poll(cx)), - None => return Poll::Ready(None), - }; + #[inline] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_next(cx) + } - self.as_mut().future().set(None); - Poll::Ready(Some(v)) + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() } } diff --git a/futures-util/src/sink/buffer.rs b/futures-util/src/sink/buffer.rs index 046efcaa4b..90480bd5bb 100644 --- a/futures-util/src/sink/buffer.rs +++ b/futures-util/src/sink/buffer.rs @@ -76,6 +76,10 @@ impl Stream for Buffer where S: Sink + Stream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.sink().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.sink.size_hint() + } } impl FusedStream for Buffer where S: Sink + FusedStream { diff --git a/futures-util/src/sink/err_into.rs b/futures-util/src/sink/err_into.rs index d93c8aa393..af198feaa0 100644 --- a/futures-util/src/sink/err_into.rs +++ b/futures-util/src/sink/err_into.rs @@ -70,6 +70,10 @@ impl Stream for SinkErrInto ) -> Poll> { self.sink().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.sink.size_hint() + } } impl FusedStream for SinkErrInto diff --git a/futures-util/src/sink/map_err.rs b/futures-util/src/sink/map_err.rs index bd2ed7ae83..7333d20460 100644 --- a/futures-util/src/sink/map_err.rs +++ b/futures-util/src/sink/map_err.rs @@ -95,6 +95,10 @@ impl Stream for SinkMapErr { ) -> Poll> { self.sink().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.sink.size_hint() + } } impl FusedStream for SinkMapErr { diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index 9a4afe4cbd..9ea47a73cb 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -97,6 +97,10 @@ impl Stream for With ) -> Poll> { self.sink().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.sink.size_hint() + } } impl With diff --git a/futures-util/src/sink/with_flat_map.rs b/futures-util/src/sink/with_flat_map.rs index 0b2726e07d..baa7536f10 100644 --- a/futures-util/src/sink/with_flat_map.rs +++ b/futures-util/src/sink/with_flat_map.rs @@ -121,12 +121,17 @@ where St: Stream>, { type Item = S::Item; + fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { self.sink().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.sink.size_hint() + } } impl FusedStream for WithFlatMap diff --git a/futures-util/src/stream/buffer_unordered.rs b/futures-util/src/stream/buffer_unordered.rs index 4ade282332..7a750b3175 100644 --- a/futures-util/src/stream/buffer_unordered.rs +++ b/futures-util/src/stream/buffer_unordered.rs @@ -127,6 +127,17 @@ where Poll::Pending } } + + fn size_hint(&self) -> (usize, Option) { + let queue_len = self.in_progress_queue.len(); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(queue_len); + let upper = match upper { + Some(x) => x.checked_add(queue_len), + None => None, + }; + (lower, upper) + } } impl FusedStream for BufferUnordered diff --git a/futures-util/src/stream/buffered.rs b/futures-util/src/stream/buffered.rs index 7b451c5ea2..1a31b63914 100644 --- a/futures-util/src/stream/buffered.rs +++ b/futures-util/src/stream/buffered.rs @@ -122,6 +122,17 @@ where Poll::Pending } } + + fn size_hint(&self) -> (usize, Option) { + let queue_len = self.in_progress_queue.len(); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(queue_len); + let upper = match upper { + Some(x) => x.checked_add(queue_len), + None => None, + }; + (lower, upper) + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/catch_unwind.rs b/futures-util/src/stream/catch_unwind.rs index 0486c51c3a..309625b728 100644 --- a/futures-util/src/stream/catch_unwind.rs +++ b/futures-util/src/stream/catch_unwind.rs @@ -45,6 +45,14 @@ impl Stream for CatchUnwind { } } } + + fn size_hint(&self) -> (usize, Option) { + if self.caught_unwind { + (0, Some(0)) + } else { + self.stream.size_hint() + } + } } impl FusedStream for CatchUnwind { diff --git a/futures-util/src/stream/chain.rs b/futures-util/src/stream/chain.rs index 22f88cb0d4..b2ada69c11 100644 --- a/futures-util/src/stream/chain.rs +++ b/futures-util/src/stream/chain.rs @@ -54,4 +54,22 @@ where St1: Stream, self.as_mut().first().set(None); self.as_mut().second().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + if let Some(first) = &self.first { + let (first_lower, first_upper) = first.size_hint(); + let (second_lower, second_upper) = self.second.size_hint(); + + let lower = first_lower.saturating_add(second_lower); + + let upper = match (first_upper, second_upper) { + (Some(x), Some(y)) => x.checked_add(y), + _ => None + }; + + (lower, upper) + } else { + self.second.size_hint() + } + } } diff --git a/futures-util/src/stream/chunks.rs b/futures-util/src/stream/chunks.rs index 1c8b518df3..87e9fbb843 100644 --- a/futures-util/src/stream/chunks.rs +++ b/futures-util/src/stream/chunks.rs @@ -105,6 +105,17 @@ impl Stream for Chunks { } } } + + fn size_hint(&self) -> (usize, Option) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } } impl FusedStream for Chunks { diff --git a/futures-util/src/stream/empty.rs b/futures-util/src/stream/empty.rs index e176bc1794..903af6851d 100644 --- a/futures-util/src/stream/empty.rs +++ b/futures-util/src/stream/empty.rs @@ -33,4 +33,8 @@ impl Stream for Empty { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(None) } + + fn size_hint(&self) -> (usize, Option) { + (0, Some(0)) + } } diff --git a/futures-util/src/stream/enumerate.rs b/futures-util/src/stream/enumerate.rs index c5fbdf34fe..a688459a98 100644 --- a/futures-util/src/stream/enumerate.rs +++ b/futures-util/src/stream/enumerate.rs @@ -81,6 +81,10 @@ impl Stream for Enumerate { None => Poll::Ready(None), } } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/filter.rs b/futures-util/src/stream/filter.rs index ad807da7cf..06335f1ee0 100644 --- a/futures-util/src/stream/filter.rs +++ b/futures-util/src/stream/filter.rs @@ -132,6 +132,16 @@ impl Stream for Filter } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/filter_map.rs b/futures-util/src/stream/filter_map.rs index d22cd8bd61..532e6cad96 100644 --- a/futures-util/src/stream/filter_map.rs +++ b/futures-util/src/stream/filter_map.rs @@ -118,6 +118,16 @@ impl Stream for FilterMap } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/fuse.rs b/futures-util/src/stream/fuse.rs index d4ac3e4ea1..9085dc553c 100644 --- a/futures-util/src/stream/fuse.rs +++ b/futures-util/src/stream/fuse.rs @@ -88,6 +88,14 @@ impl Stream for Fuse { } Poll::Ready(item) } + + fn size_hint(&self) -> (usize, Option) { + if self.done { + (0, Some(0)) + } else { + self.stream.size_hint() + } + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index 7f5d6cc01f..a30cbaa40f 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -181,6 +181,11 @@ impl Stream for FuturesOrdered { } } } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } } impl Debug for FuturesOrdered { diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index dd930ad23f..4566618340 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -427,6 +427,11 @@ impl Stream for FuturesUnordered { } } } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } } impl Debug for FuturesUnordered { diff --git a/futures-util/src/stream/inspect.rs b/futures-util/src/stream/inspect.rs index 75ca236115..e34970ae65 100644 --- a/futures-util/src/stream/inspect.rs +++ b/futures-util/src/stream/inspect.rs @@ -101,6 +101,10 @@ impl Stream for Inspect .poll_next(cx) .map(|opt| opt.map(|e| inspect(e, self.as_mut().f()))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/iter.rs b/futures-util/src/stream/iter.rs index 9701d750f8..e9df81cef0 100644 --- a/futures-util/src/stream/iter.rs +++ b/futures-util/src/stream/iter.rs @@ -41,4 +41,8 @@ impl Stream for Iter fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.iter.next()) } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } } diff --git a/futures-util/src/stream/map.rs b/futures-util/src/stream/map.rs index e681cc92ad..81194342c4 100644 --- a/futures-util/src/stream/map.rs +++ b/futures-util/src/stream/map.rs @@ -94,6 +94,10 @@ impl Stream for Map .poll_next(cx) .map(|opt| opt.map(|x| self.as_mut().f()(x))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/once.rs b/futures-util/src/stream/once.rs index 8dc6ed6ab7..4f68b0cedd 100644 --- a/futures-util/src/stream/once.rs +++ b/futures-util/src/stream/once.rs @@ -37,17 +37,22 @@ impl Once { impl Stream for Once { type Item = Fut::Output; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let val = if let Some(f) = self.as_mut().future().as_pin_mut() { - ready!(f.poll(cx)) - } else { - return Poll::Ready(None) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let v = match self.as_mut().future().as_pin_mut() { + Some(fut) => ready!(fut.poll(cx)), + None => return Poll::Ready(None), }; - self.future().set(None); - Poll::Ready(Some(val)) + + self.as_mut().future().set(None); + Poll::Ready(Some(v)) + } + + fn size_hint(&self) -> (usize, Option) { + if self.future.is_some() { + (1, Some(1)) + } else { + (0, Some(0)) + } } } diff --git a/futures-util/src/stream/peek.rs b/futures-util/src/stream/peek.rs index 636f45752d..ef9a219cd0 100644 --- a/futures-util/src/stream/peek.rs +++ b/futures-util/src/stream/peek.rs @@ -104,6 +104,17 @@ impl Stream for Peekable { } self.as_mut().stream().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + let peek_len = if self.peeked.is_some() { 1 } else { 0 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(peek_len); + let upper = match upper { + Some(x) => x.checked_add(peek_len), + None => None, + }; + (lower, upper) + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/pending.rs b/futures-util/src/stream/pending.rs index 14730b38f1..bbe07504b7 100644 --- a/futures-util/src/stream/pending.rs +++ b/futures-util/src/stream/pending.rs @@ -31,4 +31,8 @@ impl Stream for Pending { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Pending } + + fn size_hint(&self) -> (usize, Option) { + (0, Some(0)) + } } diff --git a/futures-util/src/stream/repeat.rs b/futures-util/src/stream/repeat.rs index d869b5dffb..21749eb71d 100644 --- a/futures-util/src/stream/repeat.rs +++ b/futures-util/src/stream/repeat.rs @@ -39,6 +39,10 @@ impl Stream for Repeat fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Some(self.item.clone())) } + + fn size_hint(&self) -> (usize, Option) { + (usize::max_value(), None) + } } impl FusedStream for Repeat diff --git a/futures-util/src/stream/skip.rs b/futures-util/src/stream/skip.rs index 4dd9ade020..f9edf2ae9e 100644 --- a/futures-util/src/stream/skip.rs +++ b/futures-util/src/stream/skip.rs @@ -81,6 +81,18 @@ impl Stream for Skip { self.as_mut().stream().poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining as usize); + let upper = match upper { + Some(x) => Some(x.saturating_sub(self.remaining as usize)), + None => None, + }; + + (lower, upper) + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/skip_while.rs b/futures-util/src/stream/skip_while.rs index f18f6c422b..666d9deabc 100644 --- a/futures-util/src/stream/skip_while.rs +++ b/futures-util/src/stream/skip_while.rs @@ -135,6 +135,16 @@ impl Stream for SkipWhile } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/take.rs b/futures-util/src/stream/take.rs index cd9113e531..4d0dbd31d2 100644 --- a/futures-util/src/stream/take.rs +++ b/futures-util/src/stream/take.rs @@ -1,3 +1,4 @@ +use core::cmp; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -79,6 +80,23 @@ impl Stream for Take Poll::Ready(next) } } + + fn size_hint(&self) -> (usize, Option) { + if self.remaining == 0 { + return (0, Some(0)); + } + + let (lower, upper) = self.stream.size_hint(); + + let lower = cmp::min(lower, self.remaining as usize); + + let upper = match upper { + Some(x) if x < self.remaining as usize => Some(x), + _ => Some(self.remaining as usize) + }; + + (lower, upper) + } } impl FusedStream for Take diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index e6225ca3b3..68606ec263 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -127,6 +127,20 @@ impl Stream for TakeWhile Poll::Ready(None) } } + + fn size_hint(&self) -> (usize, Option) { + if self.done_taking { + return (0, Some(0)); + } + + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } impl FusedStream for TakeWhile diff --git a/futures-util/src/stream/then.rs b/futures-util/src/stream/then.rs index 7e16e6d61b..39843b2504 100644 --- a/futures-util/src/stream/then.rs +++ b/futures-util/src/stream/then.rs @@ -115,6 +115,17 @@ impl Stream for Then self.as_mut().future().set(None); Poll::Ready(Some(e)) } + + fn size_hint(&self) -> (usize, Option) { + let future_len = if self.future.is_some() { 1 } else { 0 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(future_len); + let upper = match upper { + Some(x) => x.checked_add(future_len), + None => None, + }; + (lower, upper) + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/stream/zip.rs b/futures-util/src/stream/zip.rs index c8d7f3df0b..f97ac17d35 100644 --- a/futures-util/src/stream/zip.rs +++ b/futures-util/src/stream/zip.rs @@ -1,4 +1,5 @@ use crate::stream::{StreamExt, Fuse}; +use core::cmp; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -114,4 +115,29 @@ impl Stream for Zip Poll::Pending } } + + fn size_hint(&self) -> (usize, Option) { + let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; + let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; + let (stream1_lower, stream1_upper) = self.stream1.size_hint(); + let (stream2_lower, stream2_upper) = self.stream2.size_hint(); + + let stream1_lower = stream1_lower.saturating_add(queued1_len); + let stream2_lower = stream2_lower.saturating_add(queued2_len); + + let lower = cmp::min(stream1_lower, stream2_lower); + + let upper = match (stream1_upper, stream2_upper) { + (Some(x), Some(y)) => { + let x = x.saturating_add(queued1_len); + let y = y.saturating_add(queued2_len); + Some(cmp::min(x, y)) + } + (Some(x), None) => x.checked_add(queued1_len), + (None, Some(y)) => y.checked_add(queued2_len), + (None, None) => None + }; + + (lower, upper) + } } diff --git a/futures-util/src/try_stream/and_then.rs b/futures-util/src/try_stream/and_then.rs index d92b9a93e9..809c32a94b 100644 --- a/futures-util/src/try_stream/and_then.rs +++ b/futures-util/src/try_stream/and_then.rs @@ -102,6 +102,17 @@ impl Stream for AndThen self.as_mut().future().set(None); Poll::Ready(Some(e)) } + + fn size_hint(&self) -> (usize, Option) { + let future_len = if self.future.is_some() { 1 } else { 0 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(future_len); + let upper = match upper { + Some(x) => x.checked_add(future_len), + None => None, + }; + (lower, upper) + } } impl FusedStream for AndThen diff --git a/futures-util/src/try_stream/err_into.rs b/futures-util/src/try_stream/err_into.rs index 462c31a202..f5d92945f3 100644 --- a/futures-util/src/try_stream/err_into.rs +++ b/futures-util/src/try_stream/err_into.rs @@ -80,6 +80,10 @@ where self.stream().try_poll_next(cx) .map(|res| res.map(|some| some.map_err(Into::into))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/inspect_err.rs b/futures-util/src/try_stream/inspect_err.rs index 588e660d9f..36c21f05e7 100644 --- a/futures-util/src/try_stream/inspect_err.rs +++ b/futures-util/src/try_stream/inspect_err.rs @@ -100,6 +100,10 @@ where .try_poll_next(cx) .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f())))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/inspect_ok.rs b/futures-util/src/try_stream/inspect_ok.rs index 1674d91b52..29bb9544a1 100644 --- a/futures-util/src/try_stream/inspect_ok.rs +++ b/futures-util/src/try_stream/inspect_ok.rs @@ -100,6 +100,10 @@ where .try_poll_next(cx) .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f())))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/into_stream.rs b/futures-util/src/try_stream/into_stream.rs index f265617406..b0fa07aa79 100644 --- a/futures-util/src/try_stream/into_stream.rs +++ b/futures-util/src/try_stream/into_stream.rs @@ -69,6 +69,10 @@ impl Stream for IntoStream { ) -> Poll> { self.stream().try_poll_next(cx) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/map_err.rs b/futures-util/src/try_stream/map_err.rs index 68d5d71a3b..1b98d6b4bc 100644 --- a/futures-util/src/try_stream/map_err.rs +++ b/futures-util/src/try_stream/map_err.rs @@ -94,6 +94,10 @@ where .try_poll_next(cx) .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e)))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/map_ok.rs b/futures-util/src/try_stream/map_ok.rs index 1738c6e730..19d01be459 100644 --- a/futures-util/src/try_stream/map_ok.rs +++ b/futures-util/src/try_stream/map_ok.rs @@ -94,6 +94,10 @@ where .try_poll_next(cx) .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x)))) } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/or_else.rs b/futures-util/src/try_stream/or_else.rs index 4a18fe967e..33310d1ce3 100644 --- a/futures-util/src/try_stream/or_else.rs +++ b/futures-util/src/try_stream/or_else.rs @@ -103,6 +103,17 @@ impl Stream for OrElse self.as_mut().future().set(None); Poll::Ready(Some(e)) } + + fn size_hint(&self) -> (usize, Option) { + let future_len = if self.future.is_some() { 1 } else { 0 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(future_len); + let upper = match upper { + Some(x) => x.checked_add(future_len), + None => None, + }; + (lower, upper) + } } impl FusedStream for OrElse diff --git a/futures-util/src/try_stream/try_filter.rs b/futures-util/src/try_stream/try_filter.rs index f5035c13b1..24a9c3275a 100644 --- a/futures-util/src/try_stream/try_filter.rs +++ b/futures-util/src/try_stream/try_filter.rs @@ -129,6 +129,16 @@ impl Stream for TryFilter } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending_fut.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/try_filter_map.rs b/futures-util/src/try_stream/try_filter_map.rs index 08b7aa7ce8..ed7eeb227e 100644 --- a/futures-util/src/try_stream/try_filter_map.rs +++ b/futures-util/src/try_stream/try_filter_map.rs @@ -113,6 +113,16 @@ impl Stream for TryFilterMap } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } // Forwarding impl of Sink from the underlying stream diff --git a/futures-util/src/try_stream/try_skip_while.rs b/futures-util/src/try_stream/try_skip_while.rs index 0c0355a92b..a3d6803a1b 100644 --- a/futures-util/src/try_stream/try_skip_while.rs +++ b/futures-util/src/try_stream/try_skip_while.rs @@ -131,6 +131,16 @@ impl Stream for TrySkipWhile } } } + + fn size_hint(&self) -> (usize, Option) { + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } } impl FusedStream for TrySkipWhile diff --git a/futures/tests/object_safety.rs b/futures/tests/object_safety.rs index b8d2bb00d7..30c892f5e6 100644 --- a/futures/tests/object_safety.rs +++ b/futures/tests/object_safety.rs @@ -7,7 +7,7 @@ fn future() { assert_is_object_safe::<&dyn Future>(); assert_is_object_safe::<&dyn FusedFuture>(); - assert_is_object_safe::<&dyn TryFuture>(); + assert_is_object_safe::<&dyn TryFuture>>(); } #[test] @@ -17,7 +17,7 @@ fn stream() { assert_is_object_safe::<&dyn Stream>(); assert_is_object_safe::<&dyn FusedStream>(); - assert_is_object_safe::<&dyn TryStream>(); + assert_is_object_safe::<&dyn TryStream>>(); } #[test]