Skip to content

Commit

Permalink
Add immutable iterators for FuturesUnordered
Browse files Browse the repository at this point in the history
  • Loading branch information
rjsberry authored and cramertj committed Oct 30, 2019
1 parent 99d5ebe commit 4227961
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 2 deletions.
49 changes: 49 additions & 0 deletions futures-util/src/stream/futures_unordered/iter.rs
Expand Up @@ -15,6 +15,18 @@ pub struct IterPinMut<'a, Fut> {
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>);

#[derive(Debug)]
/// Immutable iterator over all futures in the unordered set.
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>
}

#[derive(Debug)]
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>);

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;

Expand Down Expand Up @@ -51,3 +63,40 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
}

impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}

impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
type Item = Pin<&'a Fut>;

fn next(&mut self) -> Option<Pin<&'a Fut>> {
if self.task.is_null() {
return None;
}
unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();
let next = *(*self.task).next_all.get();
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}

impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;

fn next(&mut self) -> Option<&'a Fut> {
self.0.next().map(Pin::get_ref)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
16 changes: 15 additions & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -21,7 +21,7 @@ use alloc::sync::{Arc, Weak};
mod abort;

mod iter;
pub use self::iter::{IterMut, IterPinMut};
pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -194,6 +194,20 @@ impl<Fut> FuturesUnordered<Fut> {
self.ready_to_run_queue.enqueue(ptr);
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin {
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
IterPinRef {
task: self.head_all,
len: self.len(),
_marker: PhantomData,
}
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
IterMut(Pin::new(self).iter_pin_mut())
Expand Down
72 changes: 71 additions & 1 deletion futures/tests/futures_unordered.rs
@@ -1,8 +1,12 @@
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};

use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures::task::{Context, Poll};
use futures_test::future::FutureTestExt;
use futures_test::task::noop_context;
use futures_test::{assert_stream_done, assert_stream_next};
Expand Down Expand Up @@ -164,6 +168,72 @@ fn iter_mut_len() {
assert!(iter_mut.next().is_none());
}

#[test]
fn iter_cancel() {
struct AtomicCancel<F> {
future: F,
cancel: AtomicBool,
}

impl<F: Future + Unpin> Future for AtomicCancel<F> {
type Output = Option<<F as Future>::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.cancel.load(Ordering::Relaxed) {
Poll::Ready(None)
} else {
self.future.poll_unpin(cx).map(Some)
}
}
}

impl<F: Future + Unpin> AtomicCancel<F> {
fn new(future: F) -> Self {
Self { future, cancel: AtomicBool::new(false) }
}
}

let stream = vec![
AtomicCancel::new(future::pending::<()>()),
AtomicCancel::new(future::pending::<()>()),
AtomicCancel::new(future::pending::<()>()),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

for f in stream.iter() {
f.cancel.store(true, Ordering::Relaxed);
}

let mut iter = block_on_stream(stream);

assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), None);
}

#[test]
fn iter_len() {
let stream = vec![
future::pending::<()>(),
future::pending::<()>(),
future::pending::<()>(),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut iter = stream.iter();
assert_eq!(iter.len(), 3);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 2);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 1);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 0);
assert!(iter.next().is_none());
}

#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
Expand Down

0 comments on commit 4227961

Please sign in to comment.