diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 17cde4fac4..59ea28efa6 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -29,6 +29,47 @@ pub struct IterPinRef<'a, Fut> { /// Immutable iterator over all the futures in the unordered set. pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); +#[derive(Debug)] +/// Owned iterator over all futures in the unordered set. +pub struct IntoIter { + pub(super) len: usize, + pub(super) inner: FuturesUnordered, +} + +impl Iterator for IntoIter { + type Item = Fut; + + fn next(&mut self) -> Option { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = self.inner.head_all.get_mut(); + + if (*task).is_null() { + return None; + } + + unsafe { + // Moving out of the future is safe because it is `Unpin` + let future = (*(**task).future.get()).take().unwrap(); + + // Mutable access to a previously shared `FuturesUnordered` implies + // that the other threads already released the object before the + // current thread acquired it, so relaxed ordering can be used and + // valid `next_all` checks can be skipped. + let next = (**task).next_all.load(Relaxed); + *task = next; + self.len -= 1; + Some(future) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl ExactSizeIterator for IntoIter {} + impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; @@ -36,6 +77,7 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_mut().unwrap(); @@ -78,6 +120,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_ref().unwrap(); @@ -120,3 +163,6 @@ unsafe impl Sync for IterPinRef<'_, Fut> {} unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} + +unsafe impl Send for IntoIter {} +unsafe impl Sync for IntoIter {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d1377ff327..89ce113d64 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -194,10 +194,11 @@ impl FuturesUnordered { } /// Returns an iterator that allows inspecting each future in the set. - fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); + let pending_next_all = self.pending_next_all(); - IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData } + IterPinRef { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. @@ -581,6 +582,38 @@ impl Drop for FuturesUnordered { } } +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered { + type Item = &'a mut Fut; + type IntoIter = IterMut<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +impl IntoIterator for FuturesUnordered { + type Item = Fut; + type IntoIter = IntoIter; + + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } + } +} + impl FromIterator for FuturesUnordered { fn from_iter(iter: I) -> Self where diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index e934595c2a..e0192a118b 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1827,6 +1827,13 @@ pub mod stream { assert_impl!(futures_unordered::IterPinRef<()>: Sync); assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); assert_impl!(futures_unordered::IterPinRef: Unpin); + + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter: Unpin); } /// Assert Send/Sync/Unpin for all public types in `futures::task`. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 93b9e293e3..3a5d41853d 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -214,6 +214,51 @@ fn iter_len() { assert!(iter.next().is_none()); } +#[test] +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::>(); + + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice,