diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 8303e340cc..d3bb7f33e4 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -8,24 +8,30 @@ use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + use super::assert_stream; +use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; use crate::stream::{FuturesUnordered, StreamExt, StreamFuture}; -/// An unbounded set of streams -/// -/// This "combinator" provides the ability to maintain a set of streams -/// and drive them all to completion. -/// -/// Streams are pushed into this set and their realized values are -/// yielded as they become ready. Streams will only be polled when they -/// generate notifications. This allows to coordinate a large number of streams. -/// -/// Note that you can create a ready-made `SelectAll` via the -/// `select_all` function in the `stream` module, or you can start with an -/// empty set with the `SelectAll::new` constructor. -#[must_use = "streams do nothing unless polled"] -pub struct SelectAll { - inner: FuturesUnordered>, +pin_project! { + /// An unbounded set of streams + /// + /// This "combinator" provides the ability to maintain a set of streams + /// and drive them all to completion. + /// + /// Streams are pushed into this set and their realized values are + /// yielded as they become ready. Streams will only be polled when they + /// generate notifications. This allows to coordinate a large number of streams. + /// + /// Note that you can create a ready-made `SelectAll` via the + /// `select_all` function in the `stream` module, or you can start with an + /// empty set with the `SelectAll::new` constructor. + #[must_use = "streams do nothing unless polled"] + pub struct SelectAll { + #[pin] + inner: FuturesUnordered>, + } } impl Debug for SelectAll { @@ -64,6 +70,26 @@ impl SelectAll { pub fn push(&self, stream: St) { self.inner.push(stream.into_future()); } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter(&self) -> Iter<'_, StreamFuture> { + self.inner.iter() + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture> { + self.project_ref().inner.iter_pin_ref() + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture> { + self.inner.iter_mut() + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture> { + self.project().inner.iter_pin_mut() + } } impl Default for SelectAll { @@ -139,3 +165,30 @@ impl Extend for SelectAll { } } } + +impl IntoIterator for SelectAll { + type Item = StreamFuture; + type IntoIter = IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll { + type Item = &'a StreamFuture; + type IntoIter = Iter<'a, StreamFuture>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll { + type Item = &'a mut StreamFuture; + type IntoIter = IterMut<'a, StreamFuture>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +}