Skip to content

Commit

Permalink
Fix Sync issues with FuturesUnordered
Browse files Browse the repository at this point in the history
This updates non-mutable FuturesUnordered methods to be thread-safe,
ensuring that Sync trait requirements are met without breaking existing
functionality.

The FuturesUnordered::link method used by FuturesUnordered::push has
been modified to use atomic operations to allow for lock-free insertion
of new futures while still allowing FuturesUnordered::iter and
IterPinRef itself to operate simultaneously. FuturesUnordered::len has
been removed, with each Task now storing a snapshot of the list length
at the time when the future was added, so a synchronized list length can
always be read after an atomic load of FuturesUnordered::head_all. This
increases the memory overhead of each task slightly, but it allows us to
retain the original iterator behavior without dramatically increasing
the performance cost.

Operations that require mutable FuturesUnordered access have not been
made lock-free, with direct mutable access to otherwise atomic values
used when possible to avoid unnecessary overhead.
  • Loading branch information
okready committed Jan 29, 2020
1 parent e767803 commit 6421819
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 58 deletions.
19 changes: 17 additions & 2 deletions futures-util/src/stream/futures_unordered/iter.rs
Expand Up @@ -2,6 +2,7 @@ use super::FuturesUnordered;
use super::task::Task;
use core::marker::PhantomData;
use core::pin::Pin;
use core::sync::atomic::Ordering::Relaxed;

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
Expand All @@ -20,6 +21,7 @@ pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>);
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>
}

Expand All @@ -36,7 +38,12 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
}
unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();
let next = *(*self.task).next_all.get();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (*self.task).next_all.load(Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Expand Down Expand Up @@ -73,7 +80,15 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
}
unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();
let next = *(*self.task).next_all.get();

// Relaxed ordering can be used since acquire ordering when
// `head_all` was initially read for this iterator implies acquire
// ordering for all previously inserted nodes (and we don't need to
// read `len_all` again for any other nodes).
let next = (*self.task).spin_next_all(
self.pending_next_all,
Relaxed,
);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Expand Down

0 comments on commit 6421819

Please sign in to comment.