Skip to content

Commit

Permalink
remove FuturesUnordered::iter_pin_ref, add into_iter
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 8, 2021
1 parent 6699d7c commit a0c7d31
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 37 deletions.
76 changes: 52 additions & 24 deletions futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,53 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);

#[derive(Debug)]
/// Immutable iterator over all futures in the unordered set.
pub struct IterPinRef<'a, Fut> {
pub struct Iter<'a, Fut: Unpin> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
}

#[derive(Debug)]
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
/// Owned iterator over all futures in the unordered set.
pub struct IntoIter<Fut: Unpin> {
pub(super) len: usize,
pub(super) inner: FuturesUnordered<Fut>,
}

impl<Fut: Unpin> Iterator for IntoIter<Fut> {
type Item = Fut;

fn next(&mut self) -> Option<Fut> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = self.inner.head_all.get_mut();

if (*task).is_null() {
return None;
}

unsafe {
// Moving out of the future is safe because it is `Unpin`
let future = (**task).future.get_mut().take().unwrap();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
self.len -= 1;
Some(future)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;
Expand All @@ -36,6 +73,7 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();

Expand Down Expand Up @@ -71,13 +109,14 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {

impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}

impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
type Item = Pin<&'a Fut>;
impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;

fn next(&mut self) -> Option<Pin<&'a Fut>> {
fn next(&mut self) -> Option<&'a Fut> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();

Expand All @@ -88,7 +127,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Some(future)
}
}

Expand All @@ -97,26 +136,15 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
}
}

impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}

impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;

fn next(&mut self) -> Option<&'a Fut> {
self.0.next().map(Pin::get_ref)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}

// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}

unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}

unsafe impl<Fut: Send + Unpin> Send for Iter<'_, Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for Iter<'_, Fut> {}
44 changes: 36 additions & 8 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;

mod iter;
pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef};
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -190,14 +190,10 @@ impl<Fut> FuturesUnordered<Fut> {
where
Fut: Unpin,
{
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();
let pending_next_all = self.pending_next_all();

IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
Iter { task, len, pending_next_all, _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
Expand All @@ -220,7 +216,7 @@ impl<Fut> FuturesUnordered<Fut> {

/// Returns the current head node and number of futures in the list of all
/// futures within a context where access is shared with other threads
/// (mostly for use with the `len` and `iter_pin_ref` methods).
/// (mostly for use with the `len` and `iter` methods).
fn atomic_load_head_and_len_all(&self) -> (*const Task<Fut>, usize) {
let task = self.head_all.load(Acquire);
let len = if task.is_null() {
Expand Down Expand Up @@ -581,6 +577,38 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
}
}

impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> {
type Item = Fut;
type IntoIter = IntoIter<Fut>;

fn into_iter(mut self) -> Self::IntoIter {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IntoIter { len, inner: self }
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered<Fut> {
type Item = &'a mut Fut;
type IntoIter = IterMut<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> {
type Item = &'a Fut;
type IntoIter = Iter<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
fn from_iter<I>(iter: I) -> Self
where
Expand Down
11 changes: 6 additions & 5 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1822,11 +1822,12 @@ pub mod stream {
assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin);

assert_impl!(futures_unordered::IterPinRef<()>: Send);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send);
assert_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);
assert_impl!(futures_unordered::IntoIter<()>: Send);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
assert_impl!(futures_unordered::IntoIter<()>: Sync);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync);
// The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds.
// assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin);
}

/// Assert Send/Sync/Unpin for all public types in `futures::task`.
Expand Down
45 changes: 45 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,51 @@ fn iter_len() {
assert!(iter.next().is_none());
}

#[test]
fn into_iter_cancel() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();

let stream = stream
.into_iter()
.map(|mut rx| {
rx.close();
rx
})
.collect::<FuturesUnordered<_>>();

let mut iter = block_on_stream(stream);

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), None);
}

#[test]
fn into_iter_len() {
let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut into_iter = stream.into_iter();
assert_eq!(into_iter.len(), 3);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 2);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 1);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 0);
assert!(into_iter.next().is_none());
}

#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
Expand Down

0 comments on commit a0c7d31

Please sign in to comment.