diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index f8d7321c15..3adb7ea154 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -18,13 +18,17 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); #[derive(Debug)] /// Immutable iterator over all futures in the unordered set. -pub struct Iter<'a, Fut: Unpin> { +pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, pub(super) _marker: PhantomData<&'a FuturesUnordered>, } +#[derive(Debug)] +/// Immutable iterator over all the futures in the unordered set. +pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); + #[derive(Debug)] /// Owned iterator over all futures in the unordered set. pub struct IntoIter { @@ -109,10 +113,10 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { impl ExactSizeIterator for IterMut<'_, Fut> {} -impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { - type Item = &'a Fut; +impl<'a, Fut: Unpin> Iterator for IterPinRef<'a, Fut> { + type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option> { if self.task.is_null() { return None; } @@ -127,7 +131,7 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; - Some(future) + Some(Pin::new_unchecked(future)) } } @@ -136,6 +140,20 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { } } +impl ExactSizeIterator for IterPinRef<'_, Fut> {} + +impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { + type Item = &'a Fut; + + fn next(&mut self) -> Option<&'a Fut> { + self.0.next().map(Pin::get_ref) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + impl ExactSizeIterator for Iter<'_, Fut> {} // SAFETY: we do nothing thread-local and there is no interior mutability, @@ -146,5 +164,5 @@ unsafe impl Sync for IterPinMut<'_, Fut> {} unsafe impl Send for IntoIter {} unsafe impl Sync for IntoIter {} -unsafe impl Send for Iter<'_, Fut> {} -unsafe impl Sync for Iter<'_, Fut> {} +unsafe impl Send for IterPinRef<'_, Fut> {} +unsafe impl Sync for IterPinRef<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 7a404cc202..4534eff408 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -190,10 +190,7 @@ impl FuturesUnordered { where Fut: Unpin, { - let (task, len) = self.atomic_load_head_and_len_all(); - let pending_next_all = self.pending_next_all(); - - Iter { task, len, pending_next_all, _marker: PhantomData } + Iter(Pin::new(self).iter_pin_ref()) } /// Returns an iterator that allows modifying each future in the set. @@ -204,6 +201,14 @@ impl FuturesUnordered { IterMut(Pin::new(self).iter_pin_mut()) } + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { + let (task, len) = self.atomic_load_head_and_len_all(); + let pending_next_all = self.pending_next_all(); + + IterPinRef { task, len, pending_next_all, _marker: PhantomData } + } + /// Returns an iterator that allows modifying each future in the set. pub fn iter_pin_mut(mut self: Pin<&mut Self>) -> IterPinMut<'_, Fut> { // `head_all` can be accessed directly and we don't need to spin on diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 2f302012bb..881f6b3620 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1822,6 +1822,12 @@ pub mod stream { assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut: Unpin); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); + assert_impl!(futures_unordered::IterPinRef: Unpin); + assert_impl!(futures_unordered::IntoIter<()>: Send); assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); assert_impl!(futures_unordered::IntoIter<()>: Sync);