diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 59ea28efa6..04db5ee753 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -4,20 +4,20 @@ use core::marker::PhantomData; use core::pin::Pin; use core::sync::atomic::Ordering::Relaxed; -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinMut<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); -#[derive(Debug)] /// Immutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, @@ -25,12 +25,12 @@ pub struct IterPinRef<'a, Fut> { pub(super) _marker: PhantomData<&'a FuturesUnordered>, } -#[derive(Debug)] /// Immutable iterator over all the futures in the unordered set. +#[derive(Debug)] pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); -#[derive(Debug)] /// Owned iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IntoIter { pub(super) len: usize, pub(super) inner: FuturesUnordered, @@ -39,7 +39,7 @@ pub struct IntoIter { impl Iterator for IntoIter { type Item = Fut; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = self.inner.head_all.get_mut(); @@ -73,7 +73,7 @@ impl ExactSizeIterator for IntoIter {} impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { if self.task.is_null() { return None; } @@ -102,7 +102,7 @@ impl ExactSizeIterator for IterPinMut<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { type Item = &'a mut Fut; - fn next(&mut self) -> Option<&'a mut Fut> { + fn next(&mut self) -> Option { self.0.next().map(Pin::get_mut) } @@ -116,7 +116,7 @@ impl ExactSizeIterator for IterMut<'_, Fut> {} impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { if self.task.is_null() { return None; } @@ -145,7 +145,7 @@ impl ExactSizeIterator for IterPinRef<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { type Item = &'a Fut; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option { self.0.next().map(Pin::get_ref) } diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 38f0b34dd7..7fe5c43eb8 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -105,7 +105,7 @@ cfg_target_has_atomic! { pub use self::futures_unordered::FuturesUnordered; #[cfg(feature = "alloc")] - mod select_all; + pub mod select_all; #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 4a35c39091..6e5683f321 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -11,8 +11,7 @@ use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; use super::assert_stream; -use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; -use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; +use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; pin_project! { /// An unbounded set of streams @@ -71,27 +70,17 @@ impl SelectAll { self.inner.push(stream.into_future()); } - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> Iter<'_, StreamFuture> { - self.inner.iter() + /// Returns an iterator that allows inspecting each stream in the set. + pub fn iter(&self) -> Iter<'_, St> { + Iter(self.inner.iter()) } - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture> { - self.project_ref().inner.iter_pin_ref() + /// Returns an iterator that allows modifying each stream in the set. + pub fn iter_mut(&mut self) -> IterMut<'_, St> { + IterMut(self.inner.iter_mut()) } - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture> { - self.inner.iter_mut() - } - - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { - self.project().inner.iter_pin_mut() - } - - /// Clears the set, removing all futures. + /// Clears the set, removing all streams. pub fn clear(&mut self) { self.inner.clear() } @@ -139,7 +128,7 @@ impl FusedStream for SelectAll { /// streams internally, in the order they become available. /// /// Note that the returned set can also be used to dynamically push more -/// futures into the set as they become available. +/// streams into the set as they become available. /// /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -172,17 +161,17 @@ impl Extend for SelectAll { } impl IntoIterator for SelectAll { - type Item = StreamFuture; - type IntoIter = IntoIter>; + type Item = St; + type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() + IntoIter(self.inner.into_iter()) } } impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { - type Item = &'a StreamFuture; - type IntoIter = Iter<'a, StreamFuture>; + type Item = &'a St; + type IntoIter = Iter<'a, St>; fn into_iter(self) -> Self::IntoIter { self.iter() @@ -190,10 +179,76 @@ impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { } impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll { - type Item = &'a mut StreamFuture; - type IntoIter = IterMut<'a, StreamFuture>; + type Item = &'a mut St; + type IntoIter = IterMut<'a, St>; fn into_iter(self) -> Self::IntoIter { self.iter_mut() } } + +/// Immutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture>); + +/// Mutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture>); + +/// Owned iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IntoIter(futures_unordered::IntoIter>); + +impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { + type Item = &'a St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.get_ref(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for Iter<'_, St> {} + +impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { + type Item = &'a mut St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.get_mut(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for IterMut<'_, St> {} + +impl Iterator for IntoIter { + type Item = St; + + fn next(&mut self) -> Option { + let st = self.0.next()?; + let next = st.into_inner(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for IntoIter {} diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index 70d00a549c..fdeb7b997a 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -99,3 +99,99 @@ fn clear() { tasks.clear(); assert!(!tasks.is_terminated()); } + +#[test] +fn iter_mut() { + let mut stream = + vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::>(); + + let mut iter = stream.into_iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +}