diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index b03e10bb2d..def0dd5d45 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -15,6 +15,18 @@ pub struct IterPinMut<'a, Fut> { /// Mutable iterator over all futures in the unordered set. pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>); +#[derive(Debug)] +/// Immutable iterator over all futures in the unordered set. +pub struct IterPinRef<'a, Fut> { + pub(super) task: *const Task, + pub(super) len: usize, + pub(super) _marker: PhantomData<&'a FuturesUnordered> +} + +#[derive(Debug)] +/// Immutable iterator over all the futures in the unordered set. +pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>); + impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; @@ -51,3 +63,40 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { } impl ExactSizeIterator for IterMut<'_, Fut> {} + +impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { + type Item = Pin<&'a Fut>; + + fn next(&mut self) -> Option> { + if self.task.is_null() { + return None; + } + unsafe { + let future = (*(*self.task).future.get()).as_ref().unwrap(); + let next = *(*self.task).next_all.get(); + self.task = next; + self.len -= 1; + Some(Pin::new_unchecked(future)) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl ExactSizeIterator for IterPinRef<'_, Fut> {} + +impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { + type Item = &'a Fut; + + fn next(&mut self) -> Option<&'a Fut> { + self.0.next().map(Pin::get_ref) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl ExactSizeIterator for Iter<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 4566618340..6c8d8b1065 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -21,7 +21,7 @@ use alloc::sync::{Arc, Weak}; mod abort; mod iter; -pub use self::iter::{IterMut, IterPinMut}; +pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -194,6 +194,20 @@ impl FuturesUnordered { self.ready_to_run_queue.enqueue(ptr); } + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin { + Iter(Pin::new(self).iter_pin_ref()) + } + + /// Returns an iterator that allows inspecting each future in the set. + fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { + IterPinRef { + task: self.head_all, + len: self.len(), + _marker: PhantomData, + } + } + /// Returns an iterator that allows modifying each future in the set. pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin { IterMut(Pin::new(self).iter_pin_mut()) diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index 16553665d4..1995a2bc25 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -1,8 +1,12 @@ +use std::marker::Unpin; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; + use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt}; use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; -use futures::task::Poll; +use futures::task::{Context, Poll}; use futures_test::future::FutureTestExt; use futures_test::task::noop_context; use futures_test::{assert_stream_done, assert_stream_next}; @@ -164,6 +168,72 @@ fn iter_mut_len() { assert!(iter_mut.next().is_none()); } +#[test] +fn iter_cancel() { + struct AtomicCancel { + future: F, + cancel: AtomicBool, + } + + impl Future for AtomicCancel { + type Output = Option<::Output>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.cancel.load(Ordering::Relaxed) { + Poll::Ready(None) + } else { + self.future.poll_unpin(cx).map(Some) + } + } + } + + impl AtomicCancel { + fn new(future: F) -> Self { + Self { future, cancel: AtomicBool::new(false) } + } + } + + let stream = vec![ + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + ] + .into_iter() + .collect::>(); + + for f in stream.iter() { + f.cancel.store(true, Ordering::Relaxed); + } + + let mut iter = block_on_stream(stream); + + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), None); +} + +#[test] +fn iter_len() { + let stream = vec![ + future::pending::<()>(), + future::pending::<()>(), + future::pending::<()>(), + ] + .into_iter() + .collect::>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice,