Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.3 backports #2486

Merged
merged 8 commits into from Aug 30, 2021
4 changes: 2 additions & 2 deletions futures-core/src/lib.rs
Expand Up @@ -16,11 +16,11 @@
extern crate alloc;

pub mod future;
#[doc(hidden)]
#[doc(no_inline)]
pub use self::future::{FusedFuture, Future, TryFuture};

pub mod stream;
#[doc(hidden)]
#[doc(no_inline)]
pub use self::stream::{FusedStream, Stream, TryStream};

#[macro_use]
Expand Down
10 changes: 10 additions & 0 deletions futures-macro/src/lib.rs
Expand Up @@ -19,6 +19,7 @@ use proc_macro::TokenStream;
mod executor;
mod join;
mod select;
mod stream_select;

/// The `join!` macro.
#[cfg_attr(fn_like_proc_macro, proc_macro)]
Expand Down Expand Up @@ -54,3 +55,12 @@ pub fn select_biased_internal(input: TokenStream) -> TokenStream {
pub fn test_internal(input: TokenStream, item: TokenStream) -> TokenStream {
crate::executor::test(input, item)
}

/// The `stream_select!` macro.
#[cfg_attr(fn_like_proc_macro, proc_macro)]
#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack)]
pub fn stream_select_internal(input: TokenStream) -> TokenStream {
crate::stream_select::stream_select(input.into())
.unwrap_or_else(syn::Error::into_compile_error)
.into()
}
113 changes: 113 additions & 0 deletions futures-macro/src/stream_select.rs
@@ -0,0 +1,113 @@
use proc_macro2::TokenStream;
use quote::{format_ident, quote, ToTokens};
use syn::{parse::Parser, punctuated::Punctuated, Expr, Index, Token};

/// The `stream_select!` macro.
pub(crate) fn stream_select(input: TokenStream) -> Result<TokenStream, syn::Error> {
let args = Punctuated::<Expr, Token![,]>::parse_terminated.parse2(input)?;
if args.len() < 2 {
return Ok(quote! {
compile_error!("stream select macro needs at least two arguments.")
});
}
let generic_idents = (0..args.len()).map(|i| format_ident!("_{}", i)).collect::<Vec<_>>();
let field_idents = (0..args.len()).map(|i| format_ident!("__{}", i)).collect::<Vec<_>>();
let field_idents_2 = (0..args.len()).map(|i| format_ident!("___{}", i)).collect::<Vec<_>>();
let field_indices = (0..args.len()).map(Index::from).collect::<Vec<_>>();
let args = args.iter().map(|e| e.to_token_stream());

Ok(quote! {
{
#[derive(Debug)]
struct StreamSelect<#(#generic_idents),*> (#(Option<#generic_idents>),*);

enum StreamEnum<#(#generic_idents),*> {
#(
#generic_idents(#generic_idents)
),*,
None,
}

impl<ITEM, #(#generic_idents),*> __futures_crate::stream::Stream for StreamEnum<#(#generic_idents),*>
where #(#generic_idents: __futures_crate::stream::Stream<Item=ITEM> + ::std::marker::Unpin,)*
{
type Item = ITEM;

fn poll_next(mut self: ::std::pin::Pin<&mut Self>, cx: &mut __futures_crate::task::Context<'_>) -> __futures_crate::task::Poll<Option<Self::Item>> {
match self.get_mut() {
#(
Self::#generic_idents(#generic_idents) => ::std::pin::Pin::new(#generic_idents).poll_next(cx)
),*,
Self::None => panic!("StreamEnum::None should never be polled!"),
}
}
}

impl<ITEM, #(#generic_idents),*> __futures_crate::stream::Stream for StreamSelect<#(#generic_idents),*>
where #(#generic_idents: __futures_crate::stream::Stream<Item=ITEM> + ::std::marker::Unpin,)*
{
type Item = ITEM;

fn poll_next(mut self: ::std::pin::Pin<&mut Self>, cx: &mut __futures_crate::task::Context<'_>) -> __futures_crate::task::Poll<Option<Self::Item>> {
let Self(#(ref mut #field_idents),*) = self.get_mut();
#(
let mut #field_idents_2 = false;
)*
let mut any_pending = false;
{
let mut stream_array = [#(#field_idents.as_mut().map(|f| StreamEnum::#generic_idents(f)).unwrap_or(StreamEnum::None)),*];
__futures_crate::async_await::shuffle(&mut stream_array);

for mut s in stream_array {
if let StreamEnum::None = s {
continue;
} else {
match __futures_crate::stream::Stream::poll_next(::std::pin::Pin::new(&mut s), cx) {
r @ __futures_crate::task::Poll::Ready(Some(_)) => {
return r;
},
__futures_crate::task::Poll::Pending => {
any_pending = true;
},
__futures_crate::task::Poll::Ready(None) => {
match s {
#(
StreamEnum::#generic_idents(_) => { #field_idents_2 = true; }
),*,
StreamEnum::None => panic!("StreamEnum::None should never be polled!"),
}
},
}
}
}
}
#(
if #field_idents_2 {
*#field_idents = None;
}
)*
if any_pending {
__futures_crate::task::Poll::Pending
} else {
__futures_crate::task::Poll::Ready(None)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let mut s = (0, Some(0));
#(
if let Some(new_hint) = self.#field_indices.as_ref().map(|s| s.size_hint()) {
s.0 += new_hint.0;
// We can change this out for `.zip` when the MSRV is 1.46.0 or higher.
s.1 = s.1.and_then(|a| new_hint.1.map(|b| a + b));
}
)*
s
}
}

StreamSelect(#(Some(#args)),*)

}
})
}
7 changes: 7 additions & 0 deletions futures-util/src/async_await/mod.rs
Expand Up @@ -30,6 +30,13 @@ mod select_mod;
#[cfg(feature = "async-await-macro")]
pub use self::select_mod::*;

// Primary export is a macro
#[cfg(feature = "async-await-macro")]
mod stream_select_mod;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "async-await-macro")]
pub use self::stream_select_mod::*;

#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
mod random;
Expand Down
45 changes: 45 additions & 0 deletions futures-util/src/async_await/stream_select_mod.rs
@@ -0,0 +1,45 @@
//! The `stream_select` macro.

#[cfg(feature = "std")]
#[allow(unreachable_pub)]
#[doc(hidden)]
#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::stream_select_internal;

/// Combines several streams, all producing the same `Item` type, into one stream.
/// This is similar to `select_all` but does not require the streams to all be the same type.
/// It also keeps the streams inline, and does not require `Box<dyn Stream>`s to be allocated.
/// Streams passed to this macro must be `Unpin`.
///
/// If multiple streams are ready, one will be pseudo randomly selected at runtime.
///
/// This macro is gated behind the `async-await` feature of this library, which is activated by default.
/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion
/// limit very high, e.g. `#![recursion_limit="1024"]`.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::{stream, StreamExt, stream_select};
/// let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse();
///
/// let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
/// match endless_numbers.next().await {
/// Some(1) => println!("Got a 1"),
/// Some(2) => println!("Got a 2"),
/// Some(3) => println!("Got a 3"),
/// _ => unreachable!(),
/// }
/// # });
/// ```
#[cfg(feature = "std")]
#[macro_export]
macro_rules! stream_select {
($($tokens:tt)*) => {{
use $crate::__private as __futures_crate;
$crate::stream_select_internal! {
$( $tokens )*
}
}}
}
95 changes: 73 additions & 22 deletions futures-util/src/future/join_all.rs
Expand Up @@ -12,20 +12,39 @@ use core::task::{Context, Poll};

use super::{assert_future, MaybeDone};

#[cfg(not(futures_no_atomic_cas))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};

fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}

/// Future for the [`join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
/// Future for the [`join_all`] function.
pub struct JoinAll<F>
where
F: Future,
{
elems: Pin<Box<[MaybeDone<F>]>>,
kind: JoinAllKind<F>,
}

#[cfg(not(futures_no_atomic_cas))]
const SMALL: usize = 30;

pub(crate) enum JoinAllKind<F>
where
F: Future,
{
Small {
elems: Pin<Box<[MaybeDone<F>]>>,
},
#[cfg(not(futures_no_atomic_cas))]
Big {
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
},
}

impl<F> fmt::Debug for JoinAll<F>
Expand All @@ -34,7 +53,13 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinAll").field("elems", &self.elems).finish()
match self.kind {
JoinAllKind::Small { ref elems } => {
f.debug_struct("JoinAll").field("elems", elems).finish()
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
}
}
}

Expand All @@ -50,10 +75,9 @@ where
///
/// # See Also
///
/// This is purposefully a very simple API for basic use-cases. In a lot of
/// cases you will want to use the more powerful
/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
/// reasons if the number of futures is large. You may want to look into using it or
/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
///
/// Some examples for additional functionality provided by these are:
///
Expand All @@ -75,13 +99,33 @@ where
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
/// # });
/// ```
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
#[cfg(futures_no_atomic_cas)]
{
let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
let kind = JoinAllKind::Small { elems };
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
#[cfg(not(futures_no_atomic_cas))]
{
let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
Some(max) => {
if max <= SMALL {
let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
JoinAllKind::Small { elems }
} else {
JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
}
}
};
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
}

impl<F> Future for JoinAll<F>
Expand All @@ -91,20 +135,27 @@ where
type Output = Vec<F::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
match &mut self.kind {
JoinAllKind::Small { elems } => {
let mut all_done = true;

for elem in iter_pin_mut(self.elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}
for elem in iter_pin_mut(elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}

if all_done {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
if all_done {
let mut elems = mem::replace(elems, Box::pin([]));
let result =
iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
}
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions futures-util/src/future/mod.rs
Expand Up @@ -68,6 +68,9 @@ pub use self::option::OptionFuture;
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};

mod poll_immediate;
pub use self::poll_immediate::{poll_immediate, PollImmediate};

mod ready;
pub use self::ready::{err, ok, ready, Ready};

Expand Down
6 changes: 6 additions & 0 deletions futures-util/src/future/option.rs
Expand Up @@ -31,6 +31,12 @@ pin_project! {
}
}

impl<F> Default for OptionFuture<F> {
fn default() -> Self {
Self { inner: None }
}
}

impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;

Expand Down