Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose iterators from SelectAll #2428

Merged
merged 8 commits into from May 10, 2021
Merged
83 changes: 68 additions & 15 deletions futures-util/src/stream/select_all.rs
Expand Up @@ -8,24 +8,30 @@ use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};

use pin_project_lite::pin_project;

use super::assert_stream;
use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
use crate::stream::{FuturesUnordered, StreamExt, StreamFuture};

/// An unbounded set of streams
///
/// This "combinator" provides the ability to maintain a set of streams
/// and drive them all to completion.
///
/// Streams are pushed into this set and their realized values are
/// yielded as they become ready. Streams will only be polled when they
/// generate notifications. This allows to coordinate a large number of streams.
///
/// Note that you can create a ready-made `SelectAll` via the
/// `select_all` function in the `stream` module, or you can start with an
/// empty set with the `SelectAll::new` constructor.
#[must_use = "streams do nothing unless polled"]
pub struct SelectAll<St> {
inner: FuturesUnordered<StreamFuture<St>>,
pin_project! {
/// An unbounded set of streams
///
/// This "combinator" provides the ability to maintain a set of streams
/// and drive them all to completion.
///
/// Streams are pushed into this set and their realized values are
/// yielded as they become ready. Streams will only be polled when they
/// generate notifications. This allows to coordinate a large number of streams.
///
/// Note that you can create a ready-made `SelectAll` via the
/// `select_all` function in the `stream` module, or you can start with an
/// empty set with the `SelectAll::new` constructor.
#[must_use = "streams do nothing unless polled"]
pub struct SelectAll<St> {
#[pin]
inner: FuturesUnordered<StreamFuture<St>>,
}
}

impl<St: Debug> Debug for SelectAll<St> {
Expand Down Expand Up @@ -64,6 +70,26 @@ impl<St: Stream + Unpin> SelectAll<St> {
pub fn push(&self, stream: St) {
self.inner.push(stream.into_future());
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter(&self) -> Iter<'_, StreamFuture<St>> {
self.inner.iter()
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture<St>> {
self.project_ref().inner.iter_pin_ref()
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture<St>> {
self.inner.iter_mut()
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture<St>> {
self.project().inner.iter_pin_mut()
}
}

impl<St: Stream + Unpin> Default for SelectAll<St> {
Expand Down Expand Up @@ -139,3 +165,30 @@ impl<St: Stream + Unpin> Extend<St> for SelectAll<St> {
}
}
}

impl<St: Stream + Unpin> IntoIterator for SelectAll<St> {
type Item = StreamFuture<St>;
type IntoIter = IntoIter<StreamFuture<St>>;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> {
type Item = &'a StreamFuture<St>;
type IntoIter = Iter<'a, StreamFuture<St>>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> {
type Item = &'a mut StreamFuture<St>;
type IntoIter = IterMut<'a, StreamFuture<St>>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}