From d309ea397581efefeacaa5a928d33b0ec6e87834 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 15 Nov 2019 19:04:00 +0900 Subject: [PATCH] Add select_biased! macro --- futures-macro/src/lib.rs | 6 + futures-macro/src/select.rs | 19 ++- futures-util/src/async_await/select_mod.rs | 157 ++++++++++++++++++++- futures/src/lib.rs | 11 ++ futures/tests/async_await_macros.rs | 20 +++ 5 files changed, 209 insertions(+), 4 deletions(-) diff --git a/futures-macro/src/lib.rs b/futures-macro/src/lib.rs index 87833a1bf9..c629ba15f0 100644 --- a/futures-macro/src/lib.rs +++ b/futures-macro/src/lib.rs @@ -35,3 +35,9 @@ pub fn try_join(input: TokenStream) -> TokenStream { pub fn select(input: TokenStream) -> TokenStream { crate::select::select(input) } + +/// The `select_biased!` macro. +#[proc_macro_hack] +pub fn select_biased(input: TokenStream) -> TokenStream { + crate::select::select_biased(input) +} diff --git a/futures-macro/src/select.rs b/futures-macro/src/select.rs index 29884a0d55..82b221e758 100644 --- a/futures-macro/src/select.rs +++ b/futures-macro/src/select.rs @@ -136,6 +136,15 @@ fn declare_result_enum( /// The `select!` macro. pub(crate) fn select(input: TokenStream) -> TokenStream { + select_inner(input, true) +} + +/// The `select_biased!` macro. +pub(crate) fn select_biased(input: TokenStream) -> TokenStream { + select_inner(input, false) +} + +fn select_inner(input: TokenStream, random: bool) -> TokenStream { let parsed = syn::parse_macro_input!(input as Select); let futures_crate: syn::Path = parsed.futures_crate_path.unwrap_or_else(|| parse_quote!(::futures_util)); @@ -286,6 +295,14 @@ pub(crate) fn select(input: TokenStream) -> TokenStream { } }; + let shuffle = if random { + quote! { + #futures_crate::async_await::shuffle(&mut __select_arr); + } + } else { + quote!() + }; + TokenStream::from(quote! { { #enum_item @@ -298,7 +315,7 @@ pub(crate) fn select(input: TokenStream) -> TokenStream { #( #poll_functions )* let mut __select_arr = [#( #variant_names ),*]; - #futures_crate::async_await::shuffle(&mut __select_arr); + #shuffle for poller in &mut __select_arr { let poller: &mut &mut dyn FnMut( &mut #futures_crate::task::Context<'_> diff --git a/futures-util/src/async_await/select_mod.rs b/futures-util/src/async_await/select_mod.rs index 221b6fdbdd..edca930a50 100644 --- a/futures-util/src/async_await/select_mod.rs +++ b/futures-util/src/async_await/select_mod.rs @@ -5,7 +5,7 @@ use proc_macro_hack::proc_macro_hack; #[doc(hidden)] #[macro_export] macro_rules! document_select_macro { - ($item:item) => { + ($select:item $select_biased:item) => { /// Polls multiple futures and streams simultaneously, executing the branch /// for the future that finishes first. If multiple futures are ready, /// one will be pseudo-randomly selected at runtime. Futures directly @@ -99,7 +99,7 @@ macro_rules! document_select_macro { /// # futures::executor::block_on(async { /// use futures::future::FutureExt; /// use futures::select; - /// use pin_utils::pin_mut; + /// use futures::pin_mut; /// /// // Calling the following async fn returns a Future which does not /// // implement Unpin @@ -153,11 +153,162 @@ macro_rules! document_select_macro { /// from inside the `select!` block's branches. This can be used to implement /// more complex behavior such as timer resets or writing into the head of /// a stream. - $item + $select + + /// Polls multiple futures and streams simultaneously, executing the branch + /// for the future that finishes first. Unlike [`select!`], if multiple futures are ready, + /// one will be selected in order of declaration. Futures directly + /// passed to `select_biased!` must be `Unpin` and implement `FusedFuture`. + /// + /// If an expression which yields a `Future` is passed to `select_biased!` + /// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin` + /// requirement is relaxed, since the macro will pin the resulting `Future` + /// on the stack. However the `Future` returned by the expression must + /// still implement `FusedFuture`. This difference is presented + /// + /// Futures and streams which are not already fused can be fused using the + /// `.fuse()` method. Note, though, that fusing a future or stream directly + /// in the call to `select_biased!` will not be enough to prevent it from being + /// polled after completion if the `select_biased!` call is in a loop, so when + /// `select_biased!`ing in a loop, users should take care to `fuse()` outside of + /// the loop. + /// + /// `select_biased!` can be used as an expression and will return the return + /// value of the selected branch. For this reason the return type of every + /// branch in a `select_biased!` must be the same. + /// + /// This macro is only usable inside of async functions, closures, and blocks. + /// It is also gated behind the `async-await` feature of this library, which is + /// _not_ activated by default. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::select_biased; + /// let mut a = future::ready(4); + /// let mut b = future::pending::<()>(); + /// + /// let res = select_biased! { + /// a_res = a => a_res + 1, + /// _ = b => 0, + /// }; + /// assert_eq!(res, 5); + /// # }); + /// ``` + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, StreamExt}; + /// use futures::select_biased; + /// let mut st = stream::iter(vec![2]).fuse(); + /// let mut fut = future::pending::<()>(); + /// + /// select_biased! { + /// x = st.next() => assert_eq!(Some(2), x), + /// _ = fut => panic!(), + /// }; + /// # }); + /// ``` + /// + /// As described earlier, `select_biased` can directly select on expressions + /// which return `Future`s - even if those do not implement `Unpin`: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::select_biased; + /// + /// // Calling the following async fn returns a Future which does not + /// // implement Unpin + /// async fn async_identity_fn(arg: usize) -> usize { + /// arg + /// } + /// + /// let res = select_biased! { + /// a_res = async_identity_fn(62).fuse() => a_res + 1, + /// b_res = async_identity_fn(13).fuse() => b_res, + /// }; + /// assert!(res == 63 || res == 12); + /// # }); + /// ``` + /// + /// If a similar async function is called outside of `select_biased` to produce + /// a `Future`, the `Future` must be pinned in order to be able to pass + /// it to `select_biased`. This can be achieved via `Box::pin` for pinning a + /// `Future` on the heap or the `pin_mut!` macro for pinning a `Future` + /// on the stack. + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::FutureExt; + /// use futures::select_biased; + /// use futures::pin_mut; + /// + /// // Calling the following async fn returns a Future which does not + /// // implement Unpin + /// async fn async_identity_fn(arg: usize) -> usize { + /// arg + /// } + /// + /// let fut_1 = async_identity_fn(1).fuse(); + /// let fut_2 = async_identity_fn(2).fuse(); + /// let mut fut_1 = Box::pin(fut_1); // Pins the Future on the heap + /// pin_mut!(fut_2); // Pins the Future on the stack + /// + /// let res = select_biased! { + /// a_res = fut_1 => a_res, + /// b_res = fut_2 => b_res, + /// }; + /// assert!(res == 1 || res == 2); + /// # }); + /// ``` + /// + /// `select_biased` also accepts a `complete` branch and a `default` branch. + /// `complete` will run if all futures and streams have already been + /// exhausted. `default` will run if no futures or streams are + /// immediately ready. `complete` takes priority over `default` in + /// the case where all futures have completed. + /// A motivating use-case for passing `Future`s by name as well as for + /// `complete` blocks is to call `select_biased!` in a loop, which is + /// demonstrated in the following example: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::select_biased; + /// let mut a_fut = future::ready(4); + /// let mut b_fut = future::ready(6); + /// let mut total = 0; + /// + /// loop { + /// select_biased! { + /// a = a_fut => total += a, + /// b = b_fut => total += b, + /// complete => break, + /// default => panic!(), // never runs (futures run first, then complete) + /// }; + /// } + /// assert_eq!(total, 10); + /// # }); + /// ``` + /// + /// Note that the futures that have been matched over can still be mutated + /// from inside the `select_biased!` block's branches. This can be used to implement + /// more complex behavior such as timer resets or writing into the head of + /// a stream. + /// + /// [`select!`]: macro.select.html + $select_biased } } document_select_macro! { #[proc_macro_hack(support_nested)] pub use futures_macro::select; + + #[proc_macro_hack(support_nested)] + pub use futures_macro::select_biased; } diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 89d098cfcb..a94190295e 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -554,6 +554,7 @@ pub mod inner_macro { pub use futures_util::join; pub use futures_util::try_join; pub use futures_util::select; + pub use futures_util::select_biased; } #[cfg(feature = "std")] @@ -592,4 +593,14 @@ futures_util::document_select_macro! { } } } + + #[macro_export] + macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path + ($($tokens:tt)*) => { + $crate::inner_macro::select_biased! { + futures_crate_path ( ::futures ) + $( $tokens )* + } + } + } } diff --git a/futures/tests/async_await_macros.rs b/futures/tests/async_await_macros.rs index ce48b25abb..d2ab30c152 100644 --- a/futures/tests/async_await_macros.rs +++ b/futures/tests/async_await_macros.rs @@ -56,6 +56,26 @@ fn select() { assert!(ran); } +#[test] +fn select_biased() { + use futures::select_biased; + + let (tx1, rx1) = oneshot::channel::(); + let (_tx2, rx2) = oneshot::channel::(); + tx1.send(1).unwrap(); + let mut ran = false; + block_on(async { + select_biased! { + res = rx1.fuse() => { + assert_eq!(Ok(1), res); + ran = true; + }, + _ = rx2.fuse() => unreachable!(), + } + }); + assert!(ran); +} + #[test] fn select_streams() { let (mut tx1, rx1) = mpsc::channel::(1);