diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 2e52ac17f4..7dc159ba07 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { +pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind +enum JoinAllKind where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; + assert_future::::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::>().collect() }, }; + assert_future::::Output>, _>(JoinAll { kind }) } } diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 29244af837..25fcfcb6c2 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll where F: TryFuture, { - elems: Pin]>>, + kind: TryJoinAllKind, +} + +enum TryJoinAllKind +where + F: TryFuture, +{ + Small { + elems: Pin>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect>, Vec>, + }, } impl fmt::Debug for TryJoinAll @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -83,15 +100,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all(i: I) -> TryJoinAll +pub fn try_join_all(iter: I) -> TryJoinAll where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::::Ok>, ::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, + }; + + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } } impl Future for TryJoinAll @@ -101,36 +140,46 @@ where type Output = Result, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl FromIterator for TryJoinAll { +impl FromIterator for TryJoinAll +where + F: TryFuture, +{ fn from_iter>(iter: T) -> Self { try_join_all(iter) } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 079d83bb65..6fa7adef6e 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -576,10 +576,10 @@ pub mod future { // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin - assert_impl!(TryJoinAll>: Send); + assert_impl!(TryJoinAll>: Send); assert_not_impl!(TryJoinAll: Send); assert_not_impl!(TryJoinAll: Send); - assert_impl!(TryJoinAll>: Sync); + assert_impl!(TryJoinAll>: Sync); assert_not_impl!(TryJoinAll: Sync); assert_not_impl!(TryJoinAll: Sync); assert_impl!(TryJoinAll: Unpin);