From a0c7d3175cf4cafff9acc3dcd737524e9ea7e5ee Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 7 May 2021 22:02:05 -0400 Subject: [PATCH 1/4] remove FuturesUnordered::iter_pin_ref, add into_iter --- .../src/stream/futures_unordered/iter.rs | 76 +++++++++++++------ .../src/stream/futures_unordered/mod.rs | 44 +++++++++-- futures/tests/auto_traits.rs | 11 +-- futures/tests/stream_futures_unordered.rs | 45 +++++++++++ 4 files changed, 139 insertions(+), 37 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 17cde4fac4..7ac86229c2 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -18,7 +18,7 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); #[derive(Debug)] /// Immutable iterator over all futures in the unordered set. -pub struct IterPinRef<'a, Fut> { +pub struct Iter<'a, Fut: Unpin> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, @@ -26,8 +26,45 @@ pub struct IterPinRef<'a, Fut> { } #[derive(Debug)] -/// Immutable iterator over all the futures in the unordered set. -pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); +/// Owned iterator over all futures in the unordered set. +pub struct IntoIter { + pub(super) len: usize, + pub(super) inner: FuturesUnordered, +} + +impl Iterator for IntoIter { + type Item = Fut; + + fn next(&mut self) -> Option { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = self.inner.head_all.get_mut(); + + if (*task).is_null() { + return None; + } + + unsafe { + // Moving out of the future is safe because it is `Unpin` + let future = (**task).future.get_mut().take().unwrap(); + + // Mutable access to a previously shared `FuturesUnordered` implies + // that the other threads already released the object before the + // current thread acquired it, so relaxed ordering can be used and + // valid `next_all` checks can be skipped. + let next = (**task).next_all.load(Relaxed); + *task = next; + self.len -= 1; + Some(future) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl ExactSizeIterator for IntoIter {} impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; @@ -36,6 +73,7 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_mut().unwrap(); @@ -71,13 +109,14 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { impl ExactSizeIterator for IterMut<'_, Fut> {} -impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { - type Item = Pin<&'a Fut>; +impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { + type Item = &'a Fut; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option<&'a Fut> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_ref().unwrap(); @@ -88,7 +127,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; - Some(Pin::new_unchecked(future)) + Some(future) } } @@ -97,26 +136,15 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { } } -impl ExactSizeIterator for IterPinRef<'_, Fut> {} - -impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { - type Item = &'a Fut; - - fn next(&mut self) -> Option<&'a Fut> { - self.0.next().map(Pin::get_ref) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() - } -} - impl ExactSizeIterator for Iter<'_, Fut> {} // SAFETY: we do nothing thread-local and there is no interior mutability, // so the usual structural `Send`/`Sync` apply. -unsafe impl Send for IterPinRef<'_, Fut> {} -unsafe impl Sync for IterPinRef<'_, Fut> {} - unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} + +unsafe impl Send for IntoIter {} +unsafe impl Sync for IntoIter {} + +unsafe impl Send for Iter<'_, Fut> {} +unsafe impl Sync for Iter<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d1377ff327..7a404cc202 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut}; mod task; use self::task::Task; @@ -190,14 +190,10 @@ impl FuturesUnordered { where Fut: Unpin, { - Iter(Pin::new(self).iter_pin_ref()) - } - - /// Returns an iterator that allows inspecting each future in the set. - fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); + let pending_next_all = self.pending_next_all(); - IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData } + Iter { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. @@ -220,7 +216,7 @@ impl FuturesUnordered { /// Returns the current head node and number of futures in the list of all /// futures within a context where access is shared with other threads - /// (mostly for use with the `len` and `iter_pin_ref` methods). + /// (mostly for use with the `len` and `iter` methods). fn atomic_load_head_and_len_all(&self) -> (*const Task, usize) { let task = self.head_all.load(Acquire); let len = if task.is_null() { @@ -581,6 +577,38 @@ impl Drop for FuturesUnordered { } } +impl IntoIterator for FuturesUnordered { + type Item = Fut; + type IntoIter = IntoIter; + + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } + } +} + +impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered { + type Item = &'a mut Fut; + type IntoIter = IterMut<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + impl FromIterator for FuturesUnordered { fn from_iter(iter: I) -> Self where diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index ab0c85acd7..2f302012bb 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1822,11 +1822,12 @@ pub mod stream { assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut: Unpin); - assert_impl!(futures_unordered::IterPinRef<()>: Send); - assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); - assert_impl!(futures_unordered::IterPinRef<()>: Sync); - assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); - assert_impl!(futures_unordered::IterPinRef: Unpin); + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter: Unpin); } /// Assert Send/Sync/Unpin for all public types in `futures::task`. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index bcafa243bd..3c6f7ac8fe 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -214,6 +214,51 @@ fn iter_len() { assert!(iter.next().is_none()); } +#[test] +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::>(); + + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + #[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, From 984c17329bfe426b1e3a44f2a520ed64f271c7f4 Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 7 May 2021 22:25:12 -0400 Subject: [PATCH 2/4] remove usage of UnsafeCell::get_mut for compat --- futures-util/src/stream/futures_unordered/iter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 7ac86229c2..f8d7321c15 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -46,7 +46,7 @@ impl Iterator for IntoIter { unsafe { // Moving out of the future is safe because it is `Unpin` - let future = (**task).future.get_mut().take().unwrap(); + let future = (*(**task).future.get()).take().unwrap(); // Mutable access to a previously shared `FuturesUnordered` implies // that the other threads already released the object before the From 0aadc4bd3a3f67251eeded0f34b3f29104c2871c Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Sat, 8 May 2021 04:30:39 -0400 Subject: [PATCH 3/4] re-add iter_pin_ref as public --- .../src/stream/futures_unordered/iter.rs | 34 ++++++++++++---- .../src/stream/futures_unordered/mod.rs | 39 +++++++++++-------- futures/tests/auto_traits.rs | 6 +++ 3 files changed, 54 insertions(+), 25 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index f8d7321c15..59ea28efa6 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -18,13 +18,17 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); #[derive(Debug)] /// Immutable iterator over all futures in the unordered set. -pub struct Iter<'a, Fut: Unpin> { +pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, pub(super) _marker: PhantomData<&'a FuturesUnordered>, } +#[derive(Debug)] +/// Immutable iterator over all the futures in the unordered set. +pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); + #[derive(Debug)] /// Owned iterator over all futures in the unordered set. pub struct IntoIter { @@ -109,10 +113,10 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { impl ExactSizeIterator for IterMut<'_, Fut> {} -impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { - type Item = &'a Fut; +impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { + type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option> { if self.task.is_null() { return None; } @@ -127,7 +131,7 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; - Some(future) + Some(Pin::new_unchecked(future)) } } @@ -136,15 +140,29 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { } } +impl ExactSizeIterator for IterPinRef<'_, Fut> {} + +impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { + type Item = &'a Fut; + + fn next(&mut self) -> Option<&'a Fut> { + self.0.next().map(Pin::get_ref) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + impl ExactSizeIterator for Iter<'_, Fut> {} // SAFETY: we do nothing thread-local and there is no interior mutability, // so the usual structural `Send`/`Sync` apply. +unsafe impl Send for IterPinRef<'_, Fut> {} +unsafe impl Sync for IterPinRef<'_, Fut> {} + unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} unsafe impl Send for IntoIter {} unsafe impl Sync for IntoIter {} - -unsafe impl Send for Iter<'_, Fut> {} -unsafe impl Sync for Iter<'_, Fut> {} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 7a404cc202..dac2ee4921 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; @@ -190,10 +190,15 @@ impl FuturesUnordered { where Fut: Unpin, { + Iter(Pin::new(self).iter_pin_ref()) + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); let pending_next_all = self.pending_next_all(); - Iter { task, len, pending_next_all, _marker: PhantomData } + IterPinRef { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. @@ -577,17 +582,12 @@ impl Drop for FuturesUnordered { } } -impl IntoIterator for FuturesUnordered { - type Item = Fut; - type IntoIter = IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - // `head_all` can be accessed directly and we don't need to spin on - // `Task::next_all` since we have exclusive access to the set. - let task = *self.head_all.get_mut(); - let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; - IntoIter { len, inner: self } + fn into_iter(self) -> Self::IntoIter { + self.iter() } } @@ -600,12 +600,17 @@ impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered { } } -impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered { - type Item = &'a Fut; - type IntoIter = Iter<'a, Fut>; +impl IntoIterator for FuturesUnordered { + type Item = Fut; + type IntoIter = IntoIter; - fn into_iter(self) -> Self::IntoIter { - self.iter() + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 2f302012bb..881f6b3620 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1822,6 +1822,12 @@ pub mod stream { assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut: Unpin); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); + assert_impl!(futures_unordered::IterPinRef: Unpin); + assert_impl!(futures_unordered::IntoIter<()>: Send); assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); assert_impl!(futures_unordered::IntoIter<()>: Sync); From 145f81edae5ab37d4d861e91d38e940da200fcb0 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sat, 8 May 2021 17:17:20 -0400 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Taiki Endo --- futures-util/src/stream/futures_unordered/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index dac2ee4921..89ce113d64 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -221,7 +221,7 @@ impl FuturesUnordered { /// Returns the current head node and number of futures in the list of all /// futures within a context where access is shared with other threads - /// (mostly for use with the `len` and `iter` methods). + /// (mostly for use with the `len` and `iter_pin_ref` methods). fn atomic_load_head_and_len_all(&self) -> (*const Task, usize) { let task = self.head_all.load(Acquire); let len = if task.is_null() {