Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futuresordered in join_all #2412

Merged
merged 9 commits into from Jul 29, 2021
95 changes: 73 additions & 22 deletions futures-util/src/future/join_all.rs
Expand Up @@ -12,20 +12,39 @@ use core::task::{Context, Poll};

use super::{assert_future, MaybeDone};

#[cfg(not(futures_no_atomic_cas))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};

fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}

/// Future for the [`join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
/// Future for the [`join_all`] function.
pub struct JoinAll<F>
where
F: Future,
{
elems: Pin<Box<[MaybeDone<F>]>>,
kind: JoinAllKind<F>,
}

#[cfg(not(futures_no_atomic_cas))]
const SMALL: usize = 30;

pub(crate) enum JoinAllKind<F>
where
F: Future,
{
Small {
elems: Pin<Box<[MaybeDone<F>]>>,
},
#[cfg(not(futures_no_atomic_cas))]
Big {
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
},
}

impl<F> fmt::Debug for JoinAll<F>
Expand All @@ -34,7 +53,13 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinAll").field("elems", &self.elems).finish()
match self.kind {
JoinAllKind::Small { ref elems } => {
f.debug_struct("JoinAll").field("elems", elems).finish()
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
}
}
}

Expand All @@ -50,10 +75,9 @@ where
///
/// # See Also
///
/// This is purposefully a very simple API for basic use-cases. In a lot of
/// cases you will want to use the more powerful
/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
/// reasons if the number of futures is large. You may want to look into using it or
/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
///
/// Some examples for additional functionality provided by these are:
///
Expand All @@ -75,13 +99,33 @@ where
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
/// # });
/// ```
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
#[cfg(futures_no_atomic_cas)]
{
let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
let kind = JoinAllKind::Small { elems };
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
#[cfg(not(futures_no_atomic_cas))]
{
let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
Some(max) => {
if max <= SMALL {
let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
JoinAllKind::Small { elems }
} else {
JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
}
}
};
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
}

impl<F> Future for JoinAll<F>
Expand All @@ -91,20 +135,27 @@ where
type Output = Vec<F::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
match &mut self.kind {
JoinAllKind::Small { elems } => {
let mut all_done = true;

for elem in iter_pin_mut(self.elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}
for elem in iter_pin_mut(elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}

if all_done {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
if all_done {
let mut elems = mem::replace(elems, Box::pin([]));
let result =
iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
}
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/sink/mod.rs
Expand Up @@ -243,7 +243,7 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This future will drive the stream to keep producing items until it is
/// exhausted, sending each item to the sink. It will complete once both the
/// stream is exhausted, the sink has received all items, and the sink has
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// an error, that error will be returned by this future without flushing the sink.
///
/// Doing `sink.send_all(stream)` is roughly equivalent to
Expand Down