diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index f8d7321c15..59ea28efa6 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -18,13 +18,17 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); #[derive(Debug)] /// Immutable iterator over all futures in the unordered set. -pub struct Iter<'a, Fut: Unpin> { +pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, pub(super) _marker: PhantomData<&'a FuturesUnordered>, } +#[derive(Debug)] +/// Immutable iterator over all the futures in the unordered set. +pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); + #[derive(Debug)] /// Owned iterator over all futures in the unordered set. pub struct IntoIter { @@ -109,10 +113,10 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { impl ExactSizeIterator for IterMut<'_, Fut> {} -impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { - type Item = &'a Fut; +impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { + type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option> { if self.task.is_null() { return None; } @@ -127,7 +131,7 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; - Some(future) + Some(Pin::new_unchecked(future)) } } @@ -136,15 +140,29 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { } } +impl ExactSizeIterator for IterPinRef<'_, Fut> {} + +impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { + type Item = &'a Fut; + + fn next(&mut self) -> Option<&'a Fut> { + self.0.next().map(Pin::get_ref) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + impl ExactSizeIterator for Iter<'_, Fut> {} // SAFETY: we do nothing thread-local and there is no interior mutability, // so the usual structural `Send`/`Sync` apply. +unsafe impl Send for IterPinRef<'_, Fut> {} +unsafe impl Sync for IterPinRef<'_, Fut> {} + unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} unsafe impl Send for IntoIter {} unsafe impl Sync for IntoIter {} - -unsafe impl Send for Iter<'_, Fut> {} -unsafe impl Sync for Iter<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 7a404cc202..dac2ee4921 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -190,10 +190,15 @@ impl FuturesUnordered { where Fut: Unpin, { + Iter(Pin::new(self).iter_pin_ref()) + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); let pending_next_all = self.pending_next_all(); - Iter { task, len, pending_next_all, _marker: PhantomData } + IterPinRef { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. @@ -577,17 +582,12 @@ impl Drop for FuturesUnordered { } } -impl IntoIterator for FuturesUnordered { - type Item = Fut; - type IntoIter = IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - // `head_all` can be accessed directly and we don't need to spin on - // `Task::next_all` since we have exclusive access to the set. - let task = *self.head_all.get_mut(); - let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; - IntoIter { len, inner: self } + fn into_iter(self) -> Self::IntoIter { + self.iter() } } @@ -600,12 +600,17 @@ impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered { } } -impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { - type Item = &'a Fut; - type IntoIter = Iter<'a, Fut>; +impl IntoIterator for FuturesUnordered { + type Item = Fut; + type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - self.iter() + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 2f302012bb..881f6b3620 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1822,6 +1822,12 @@ pub mod stream { assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut: Unpin); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); + assert_impl!(futures_unordered::IterPinRef: Unpin); + assert_impl!(futures_unordered::IntoIter<()>: Send); assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); assert_impl!(futures_unordered::IntoIter<()>: Sync);