Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered::clear #2415

Merged
merged 8 commits into from May 10, 2021
2 changes: 1 addition & 1 deletion futures-util/src/sink/mod.rs
Expand Up @@ -243,7 +243,7 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This future will drive the stream to keep producing items until it is
/// exhausted, sending each item to the sink. It will complete once both the
/// stream is exhausted, the sink has received all items, and the sink has
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// an error, that error will be returned by this future without flushing the sink.
///
/// Doing `sink.send_all(stream)` is roughly equivalent to
Expand Down
111 changes: 52 additions & 59 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -3,11 +3,8 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.

use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
Expand All @@ -16,8 +13,11 @@ use core::mem;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use core::sync::atomic::{AtomicPtr, AtomicBool};
use alloc::sync::{Arc, Weak};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};

mod abort;

Expand All @@ -28,8 +28,7 @@ mod task;
use self::task::Task;

mod ready_to_run_queue;
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};

use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// A set of futures which may complete in any order.
///
Expand Down Expand Up @@ -63,18 +62,14 @@ unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}
impl<Fut> Unpin for FuturesUnordered<Fut> {}

impl Spawn for FuturesUnordered<FutureObj<'_, ()>> {
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>)
-> Result<(), SpawnError>
{
fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
}

impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>)
-> Result<(), SpawnError>
{
fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
Expand Down Expand Up @@ -191,24 +186,25 @@ impl<Fut> FuturesUnordered<Fut> {
}

/// Returns an iterator that allows inspecting each future in the set.
pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin {
pub fn iter(&self) -> Iter<'_, Fut>
where
Fut: Unpin,
{
Iter(Pin::new(self).iter_pin_ref())
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();

IterPinRef {
task,
len,
pending_next_all: self.pending_next_all(),
_marker: PhantomData,
}
IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin {
pub fn iter_mut(&mut self) -> IterMut<'_, Fut>
where
Fut: Unpin,
{
IterMut(Pin::new(self).iter_pin_mut())
}

Expand All @@ -217,19 +213,9 @@ impl<Fut> FuturesUnordered<Fut> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() {
0
} else {
unsafe {
*(*task).len_all.get()
}
};
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IterPinMut {
task,
len,
_marker: PhantomData
}
IterPinMut { task, len, _marker: PhantomData }
}

/// Returns the current head node and number of futures in the list of all
Expand All @@ -249,7 +235,7 @@ impl<Fut> FuturesUnordered<Fut> {
(task, len)
}

/// Releases the task. It destorys the future inside and either drops
/// Releases the task. It destroys the future inside and either drops
/// the `Arc<Task>` or transfers ownership to the ready to run queue.
/// The task this method is called on must have been unlinked before.
fn release_task(&mut self, task: Arc<Task<Fut>>) {
Expand Down Expand Up @@ -395,9 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
Expand Down Expand Up @@ -469,14 +453,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {

// Double check that the call to `release_task` really
// happened. Calling it required the task to be unlinked.
debug_assert_eq!(
task.next_all.load(Relaxed),
self.pending_next_all()
);
debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all());
unsafe {
debug_assert!((*task.prev_all.get()).is_null());
}
continue
continue;
}
};

Expand Down Expand Up @@ -516,10 +497,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
}
}

let mut bomb = Bomb {
task: Some(task),
queue: &mut *self,
};
let mut bomb = Bomb { task: Some(task), queue: &mut *self };

// Poll the underlying future with the appropriate waker
// implementation. This is where a large bit of the unsafety
Expand Down Expand Up @@ -555,11 +533,9 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
cx.waker().wake_by_ref();
return Poll::Pending;
}
continue
}
Poll::Ready(output) => {
return Poll::Ready(Some(output))
continue;
}
Poll::Ready(output) => return Poll::Ready(Some(output)),
}
}
}
Expand All @@ -576,19 +552,33 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
}
}

impl<Fut> FuturesUnordered<Fut> {
/// Clears the set, removing all futures.
pub fn clear(&mut self) {
self.clear_head_all();

// we just cleared all the tasks, and we have &mut self, so this is safe.
unsafe { self.ready_to_run_queue.clear() };

self.is_terminated.store(false, Relaxed);
}

fn clear_head_all(&mut self) {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
}
}
}

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
unsafe {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = self.unlink(head);
self.release_task(task);
}
}
self.clear_head_all();

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand All @@ -611,7 +601,10 @@ impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
I: IntoIterator<Item = Fut>,
{
let acc = Self::new();
iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc })
iter.into_iter().fold(acc, |acc, item| {
acc.push(item);
acc
})
}
}

Expand Down
41 changes: 27 additions & 14 deletions futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -1,9 +1,9 @@
use crate::task::AtomicWaker;
use alloc::sync::Arc;
use core::cell::UnsafeCell;
use core::ptr;
use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel};
use alloc::sync::Arc;
use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

use super::abort::abort;
use super::task::Task;
Expand Down Expand Up @@ -85,25 +85,38 @@ impl<Fut> ReadyToRunQueue<Fut> {
pub(super) fn stub(&self) -> *const Task<Fut> {
&*self.stub
}

// Clear the queue of tasks.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. This method just pulls out
// tasks and drops their refcounts.
//
// # Safety
//
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
// - The caller **must** guarantee unique access to `self`
pub(crate) unsafe fn clear(&self) {
loop {
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
}
}

impl<Fut> Drop for ReadyToRunQueue<Fut> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<Fut>` we need to clear out
// the ready to run queue of tasks if there's anything left in there.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. All tasks should have had
// their futures dropped already by the `FuturesUnordered` destructor
// above, so we're just pulling out tasks and dropping their refcounts.

// All tasks have had their futures dropped already by the `FuturesUnordered`
// destructor above, and we have &mut self, so this is safe.
unsafe {
loop {
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
self.clear();
}
}
}
22 changes: 22 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Expand Up @@ -300,3 +300,25 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
}

#[test]
fn clear() {
let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);

assert_eq!(block_on(tasks.next()), Some(1));
assert!(!tasks.is_empty());

tasks.clear();
assert!(tasks.is_empty());
ibraheemdev marked this conversation as resolved.
Show resolved Hide resolved

tasks.push(future::ready(3));
assert!(!tasks.is_empty());

tasks.clear();
assert!(tasks.is_empty());

assert_eq!(block_on(tasks.next()), None);
assert!(tasks.is_terminated());
tasks.clear();
assert!(!tasks.is_terminated());
}