Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Sync issues with FuturesUnordered #2054

Merged
merged 1 commit into from Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 17 additions & 2 deletions futures-util/src/stream/futures_unordered/iter.rs
Expand Up @@ -2,6 +2,7 @@ use super::FuturesUnordered;
use super::task::Task;
use core::marker::PhantomData;
use core::pin::Pin;
use core::sync::atomic::Ordering::Relaxed;

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
Expand All @@ -20,6 +21,7 @@ pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>);
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>
}

Expand All @@ -36,7 +38,12 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
}
unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();
let next = *(*self.task).next_all.get();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (*self.task).next_all.load(Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Expand Down Expand Up @@ -73,7 +80,15 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
}
unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();
let next = *(*self.task).next_all.get();

// Relaxed ordering can be used since acquire ordering when
// `head_all` was initially read for this iterator implies acquire
// ordering for all previously inserted nodes (and we don't need to
// read `len_all` again for any other nodes).
let next = (*self.task).spin_next_all(
self.pending_next_all,
Relaxed,
);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Expand Down