Skip to content

Commit

Permalink
re-add iter_pin_ref as public
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 8, 2021
1 parent 984c173 commit 0aadc4b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 25 deletions.
34 changes: 26 additions & 8 deletions futures-util/src/stream/futures_unordered/iter.rs
Expand Up @@ -18,13 +18,17 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);

#[derive(Debug)]
/// Immutable iterator over all futures in the unordered set.
pub struct Iter<'a, Fut: Unpin> {
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
}

#[derive(Debug)]
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);

#[derive(Debug)]
/// Owned iterator over all futures in the unordered set.
pub struct IntoIter<Fut: Unpin> {
Expand Down Expand Up @@ -109,10 +113,10 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {

impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}

impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;
impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
type Item = Pin<&'a Fut>;

fn next(&mut self) -> Option<&'a Fut> {
fn next(&mut self) -> Option<Pin<&'a Fut>> {
if self.task.is_null() {
return None;
}
Expand All @@ -127,7 +131,7 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
self.task = next;
self.len -= 1;
Some(future)
Some(Pin::new_unchecked(future))
}
}

Expand All @@ -136,15 +140,29 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
}
}

impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}

impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;

fn next(&mut self) -> Option<&'a Fut> {
self.0.next().map(Pin::get_ref)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}

// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}

unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}

unsafe impl<Fut: Send + Unpin> Send for Iter<'_, Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for Iter<'_, Fut> {}
39 changes: 22 additions & 17 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;

mod iter;
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut};
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -190,10 +190,15 @@ impl<Fut> FuturesUnordered<Fut> {
where
Fut: Unpin,
{
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();
let pending_next_all = self.pending_next_all();

Iter { task, len, pending_next_all, _marker: PhantomData }
IterPinRef { task, len, pending_next_all, _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
Expand Down Expand Up @@ -577,17 +582,12 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
}
}

impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> {
type Item = Fut;
type IntoIter = IntoIter<Fut>;

fn into_iter(mut self) -> Self::IntoIter {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };
impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> {
type Item = &'a Fut;
type IntoIter = Iter<'a, Fut>;

IntoIter { len, inner: self }
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

Expand All @@ -600,12 +600,17 @@ impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered<Fut> {
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> {
type Item = &'a Fut;
type IntoIter = Iter<'a, Fut>;
impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> {
type Item = Fut;
type IntoIter = IntoIter<Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
fn into_iter(mut self) -> Self::IntoIter {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IntoIter { len, inner: self }
}
}

Expand Down
6 changes: 6 additions & 0 deletions futures/tests/auto_traits.rs
Expand Up @@ -1822,6 +1822,12 @@ pub mod stream {
assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin);

assert_impl!(futures_unordered::IterPinRef<()>: Send);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send);
assert_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);

assert_impl!(futures_unordered::IntoIter<()>: Send);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
assert_impl!(futures_unordered::IntoIter<()>: Sync);
Expand Down

0 comments on commit 0aadc4b

Please sign in to comment.