Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add immutable iterators for FuturesUnordered #1922

Merged
merged 1 commit into from Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 49 additions & 0 deletions futures-util/src/stream/futures_unordered/iter.rs
Expand Up @@ -15,6 +15,18 @@ pub struct IterPinMut<'a, Fut> {
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>);

#[derive(Debug)]
/// Immutable iterator over all futures in the unordered set.
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>
}

#[derive(Debug)]
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>);

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;

Expand Down Expand Up @@ -51,3 +63,40 @@ impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
}

impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}

impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
type Item = Pin<&'a Fut>;

fn next(&mut self) -> Option<Pin<&'a Fut>> {
if self.task.is_null() {
return None;
}
unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();
let next = *(*self.task).next_all.get();
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}

impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
type Item = &'a Fut;

fn next(&mut self) -> Option<&'a Fut> {
self.0.next().map(Pin::get_ref)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
16 changes: 15 additions & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -21,7 +21,7 @@ use alloc::sync::{Arc, Weak};
mod abort;

mod iter;
pub use self::iter::{IterMut, IterPinMut};
pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -194,6 +194,20 @@ impl<Fut> FuturesUnordered<Fut> {
self.ready_to_run_queue.enqueue(ptr);
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin {
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
IterPinRef {
task: self.head_all,
len: self.len(),
_marker: PhantomData,
}
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
IterMut(Pin::new(self).iter_pin_mut())
Expand Down
72 changes: 71 additions & 1 deletion futures/tests/futures_unordered.rs
@@ -1,8 +1,12 @@
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};

use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures::task::{Context, Poll};
use futures_test::future::FutureTestExt;
use futures_test::task::noop_context;
use futures_test::{assert_stream_done, assert_stream_next};
Expand Down Expand Up @@ -164,6 +168,72 @@ fn iter_mut_len() {
assert!(iter_mut.next().is_none());
}

#[test]
fn iter_cancel() {
struct AtomicCancel<F> {
future: F,
cancel: AtomicBool,
}

impl<F: Future + Unpin> Future for AtomicCancel<F> {
type Output = Option<<F as Future>::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.cancel.load(Ordering::Relaxed) {
Poll::Ready(None)
} else {
self.future.poll_unpin(cx).map(Some)
}
}
}

impl<F: Future + Unpin> AtomicCancel<F> {
fn new(future: F) -> Self {
Self { future, cancel: AtomicBool::new(false) }
}
}

let stream = vec![
AtomicCancel::new(future::pending::<()>()),
AtomicCancel::new(future::pending::<()>()),
AtomicCancel::new(future::pending::<()>()),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

for f in stream.iter() {
f.cancel.store(true, Ordering::Relaxed);
}

let mut iter = block_on_stream(stream);

assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), Some(None));
assert_eq!(iter.next(), None);
}

#[test]
fn iter_len() {
let stream = vec![
future::pending::<()>(),
future::pending::<()>(),
future::pending::<()>(),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut iter = stream.iter();
assert_eq!(iter.len(), 3);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 2);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 1);
assert!(iter.next().is_some());
assert_eq!(iter.len(), 0);
assert!(iter.next().is_none());
}

#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
Expand Down