Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered::clear #2415

Merged
merged 8 commits into from May 10, 2021
100 changes: 43 additions & 57 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -3,11 +3,8 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.

use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
Expand All @@ -16,8 +13,11 @@ use core::mem;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::{AtomicPtr, AtomicBool};
use alloc::sync::{Arc, Weak};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};

mod abort;

Expand All @@ -28,8 +28,7 @@ mod task;
use self::task::Task;

mod ready_to_run_queue;
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};

use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// A set of futures which may complete in any order.
///
Expand Down Expand Up @@ -63,18 +62,14 @@ unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
impl<Fut> Unpin for FuturesUnordered<Fut> {}

impl Spawn for FuturesUnordered<FutureObj<'_, ()>> {
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>)
-> Result<(), SpawnError>
{
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
}

impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>)
-> Result<(), SpawnError>
{
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
Expand Down Expand Up @@ -191,24 +186,25 @@ impl<Fut> FuturesUnordered<Fut> {
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin {
pub fn iter(&self) -> Iter<'_, Fut>
where
Fut: Unpin,
{
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();

IterPinRef {
task,
len,
pending_next_all: self.pending_next_all(),
_marker: PhantomData,
}
IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
pub fn iter_mut(&mut self) -> IterMut<'_, Fut>
where
Fut: Unpin,
{
IterMut(Pin::new(self).iter_pin_mut())
}

Expand All @@ -217,19 +213,9 @@ impl<Fut> FuturesUnordered<Fut> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() {
0
} else {
unsafe {
*(*task).len_all.get()
}
};
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IterPinMut {
task,
len,
_marker: PhantomData
}
IterPinMut { task, len, _marker: PhantomData }
}

/// Returns the current head node and number of futures in the list of all
Expand Down Expand Up @@ -395,9 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
Expand Down Expand Up @@ -469,14 +453,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {

// Double check that the call to `release_task` really
// happened. Calling it required the task to be unlinked.
debug_assert_eq!(
task.next_all.load(Relaxed),
self.pending_next_all()
);
debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all());
unsafe {
debug_assert!((*task.prev_all.get()).is_null());
}
continue
continue;
}
};

Expand Down Expand Up @@ -516,10 +497,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
}
}

let mut bomb = Bomb {
task: Some(task),
queue: &mut *self,
};
let mut bomb = Bomb { task: Some(task), queue: &mut *self };

// Poll the underlying future with the appropriate waker
// implementation. This is where a large bit of the unsafety
Expand Down Expand Up @@ -555,11 +533,9 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
cx.waker().wake_by_ref();
return Poll::Pending;
}
continue
}
Poll::Ready(output) => {
return Poll::Ready(Some(output))
continue;
}
Poll::Ready(output) => return Poll::Ready(Some(output)),
}
}
}
Expand All @@ -576,19 +552,26 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
}
}

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
impl<Fut> FuturesUnordered<Fut> {
/// Clear all the futures associated with this task.
pub fn clear(&mut self) {
unsafe {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = self.unlink(head);
self.release_task(task);
}
}
taiki-e marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
self.clear();

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand All @@ -611,7 +594,10 @@ impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
I: IntoIterator<Item = Fut>,
{
let acc = Self::new();
iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc })
iter.into_iter().fold(acc, |acc, item| {
acc.push(item);
acc
})
}
}

Expand Down
11 changes: 11 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Expand Up @@ -300,3 +300,14 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
}

#[test]
fn clear() {
let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);

assert_eq!(block_on(tasks.next()), Some(1));
assert!(!tasks.is_empty());

tasks.clear();
assert!(tasks.is_empty());
ibraheemdev marked this conversation as resolved.
Show resolved Hide resolved
}