From 28b27d178de85904c46cd80263fba736475bc47c Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sun, 30 Jan 2022 18:17:11 -0500 Subject: [PATCH 1/4] switch to `FuturesOrdered` dynamically in `try_join_all` --- futures-util/src/future/join_all.rs | 6 +- futures-util/src/future/try_join_all.rs | 125 +++++++++++++++++------- 2 files changed, 90 insertions(+), 41 deletions(-) diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 2e52ac17f4..b61111b706 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { +pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind +enum JoinAllKind where F: Future, { diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 29244af837..57a0752826 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,14 +10,10 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, TryFuture, TryMaybeDone}; -fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; enum FinalState { Pending, @@ -31,7 +27,20 @@ pub struct TryJoinAll where F: TryFuture, { - elems: Pin]>>, + kind: TryJoinAllKind, +} + +enum TryJoinAllKind +where + F: TryFuture, +{ + Small { + elems: Pin]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect, Vec>, + }, } impl fmt::Debug for TryJoinAll @@ -39,9 +48,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -83,54 +99,87 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all(i: I) -> TryJoinAll +pub fn try_join_all(iter: I) -> TryJoinAll where I: IntoIterator, - I::Item: TryFuture, + I::Item: TryFuture + + Future::Ok, ::Error>>, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::::Ok>, ::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + #[cfg(futures_no_atomic_cas)] + { + let elems = iter.into_iter().map(TryMaybeDone::Future).try_collect::>().into(); + let kind = TryJoinAllKind::Small { elems }; + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } + #[cfg(not(futures_no_atomic_cas))] + { + let iter = iter.into_iter(); + let kind = match iter.size_hint().1 { + None => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, + Some(max) => { + if max <= join_all::SMALL { + let elems = iter.map(TryMaybeDone::Future).collect::>().into(); + TryJoinAllKind::Small { elems } + } else { + TryJoinAllKind::Big { fut: iter.collect::>().try_collect() } + } + } + }; + assert_future::::Ok>, ::Error>, _>( + TryJoinAll { kind }, + ) + } } impl Future for TryJoinAll where - F: TryFuture, + F: TryFuture + Future>, { type Output = Result, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut state = FinalState::AllDone; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl FromIterator for TryJoinAll { +impl FromIterator for TryJoinAll +where + F: TryFuture + Future>, +{ fn from_iter>(iter: T) -> Self { try_join_all(iter) } From 50bfb055f3e2643bfc5058ebb09c79da137b315a Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Mon, 30 May 2022 17:55:04 -0400 Subject: [PATCH 2/4] remove extra bounds from `try_join_all` --- futures-util/src/future/try_join_all.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 57a0752826..491af4bbec 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,10 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, join_all, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState { Pending, @@ -35,11 +36,11 @@ where F: TryFuture, { Small { - elems: Pin]>>, + elems: Pin>]>>, }, #[cfg(not(futures_no_atomic_cas))] Big { - fut: TryCollect, Vec>, + fut: TryCollect>, Vec>, }, } @@ -102,20 +103,21 @@ where pub fn try_join_all(iter: I) -> TryJoinAll where I: IntoIterator, - I::Item: TryFuture - + Future::Ok, ::Error>>, + I::Item: TryFuture, { + let iter = iter.into_iter().map(TryFutureExt::into_future); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(TryMaybeDone::Future).try_collect::>().into(); + let elems = iter.map(TryMaybeDone::Future).collect::>().into(); let kind = TryJoinAllKind::Small { elems }; assert_future::::Ok>, ::Error>, _>( TryJoinAll { kind }, ) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { None => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, Some(max) => { @@ -135,7 +137,7 @@ where impl Future for TryJoinAll where - F: TryFuture + Future>, + F: TryFuture, { type Output = Result, F::Error>; @@ -178,7 +180,7 @@ where impl FromIterator for TryJoinAll where - F: TryFuture + Future>, + F: TryFuture, { fn from_iter>(iter: T) -> Self { try_join_all(iter) From 666ac4a953d61ace9fa60f2cc4e4d87316161a00 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sat, 4 Jun 2022 16:28:42 -0400 Subject: [PATCH 3/4] update `TryJoinAll` auto trait tests --- futures/tests/auto_traits.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 558e0bf1d6..c3a45705e3 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -576,10 +576,10 @@ pub mod future { // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin - assert_impl!(TryJoinAll>: Send); + assert_impl!(TryJoinAll>: Send); assert_not_impl!(TryJoinAll: Send); assert_not_impl!(TryJoinAll: Send); - assert_impl!(TryJoinAll>: Sync); + assert_impl!(TryJoinAll>: Sync); assert_not_impl!(TryJoinAll: Sync); assert_not_impl!(TryJoinAll: Sync); assert_impl!(TryJoinAll: Unpin); From 24e4e31103e6b748ccb43ae75f1fb2502cd2b129 Mon Sep 17 00:00:00 2001 From: Ibraheem Ahmed Date: Sat, 4 Jun 2022 16:37:11 -0400 Subject: [PATCH 4/4] cleanup `join_all` code --- futures-util/src/future/join_all.rs | 23 +++++++++++------------ futures-util/src/future/try_join_all.rs | 20 +++++++++----------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index b61111b706..7dc159ba07 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::>().into() }; + assert_future::::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::>().collect() }, }; + assert_future::::Output>, _>(JoinAll { kind }) } } diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 491af4bbec..25fcfcb6c2 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -109,8 +109,10 @@ where #[cfg(futures_no_atomic_cas)] { - let elems = iter.map(TryMaybeDone::Future).collect::>().into(); - let kind = TryJoinAllKind::Small { elems }; + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }; + assert_future::::Ok>, ::Error>, _>( TryJoinAll { kind }, ) @@ -119,16 +121,12 @@ where #[cfg(not(futures_no_atomic_cas))] { let kind = match iter.size_hint().1 { - None => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, - Some(max) => { - if max <= join_all::SMALL { - let elems = iter.map(TryMaybeDone::Future).collect::>().into(); - TryJoinAllKind::Small { elems } - } else { - TryJoinAllKind::Big { fut: iter.collect::>().try_collect() } - } - } + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::>().try_collect() }, }; + assert_future::::Ok>, ::Error>, _>( TryJoinAll { kind }, )