Skip to content

Commit

Permalink
Add select_biased! macro
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and cramertj committed Nov 15, 2019
1 parent 260f76b commit d309ea3
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 4 deletions.
6 changes: 6 additions & 0 deletions futures-macro/src/lib.rs
Expand Up @@ -35,3 +35,9 @@ pub fn try_join(input: TokenStream) -> TokenStream {
pub fn select(input: TokenStream) -> TokenStream {
crate::select::select(input)
}

/// The `select_biased!` macro.
#[proc_macro_hack]
pub fn select_biased(input: TokenStream) -> TokenStream {
crate::select::select_biased(input)
}
19 changes: 18 additions & 1 deletion futures-macro/src/select.rs
Expand Up @@ -136,6 +136,15 @@ fn declare_result_enum(

/// The `select!` macro.
pub(crate) fn select(input: TokenStream) -> TokenStream {
select_inner(input, true)
}

/// The `select_biased!` macro.
pub(crate) fn select_biased(input: TokenStream) -> TokenStream {
select_inner(input, false)
}

fn select_inner(input: TokenStream, random: bool) -> TokenStream {
let parsed = syn::parse_macro_input!(input as Select);

let futures_crate: syn::Path = parsed.futures_crate_path.unwrap_or_else(|| parse_quote!(::futures_util));
Expand Down Expand Up @@ -286,6 +295,14 @@ pub(crate) fn select(input: TokenStream) -> TokenStream {
}
};

let shuffle = if random {
quote! {
#futures_crate::async_await::shuffle(&mut __select_arr);
}
} else {
quote!()
};

TokenStream::from(quote! { {
#enum_item

Expand All @@ -298,7 +315,7 @@ pub(crate) fn select(input: TokenStream) -> TokenStream {
#( #poll_functions )*

let mut __select_arr = [#( #variant_names ),*];
#futures_crate::async_await::shuffle(&mut __select_arr);
#shuffle
for poller in &mut __select_arr {
let poller: &mut &mut dyn FnMut(
&mut #futures_crate::task::Context<'_>
Expand Down
157 changes: 154 additions & 3 deletions futures-util/src/async_await/select_mod.rs
Expand Up @@ -5,7 +5,7 @@ use proc_macro_hack::proc_macro_hack;
#[doc(hidden)]
#[macro_export]
macro_rules! document_select_macro {
($item:item) => {
($select:item $select_biased:item) => {
/// Polls multiple futures and streams simultaneously, executing the branch
/// for the future that finishes first. If multiple futures are ready,
/// one will be pseudo-randomly selected at runtime. Futures directly
Expand Down Expand Up @@ -99,7 +99,7 @@ macro_rules! document_select_macro {
/// # futures::executor::block_on(async {
/// use futures::future::FutureExt;
/// use futures::select;
/// use pin_utils::pin_mut;
/// use futures::pin_mut;
///
/// // Calling the following async fn returns a Future which does not
/// // implement Unpin
Expand Down Expand Up @@ -153,11 +153,162 @@ macro_rules! document_select_macro {
/// from inside the `select!` block's branches. This can be used to implement
/// more complex behavior such as timer resets or writing into the head of
/// a stream.
$item
$select

/// Polls multiple futures and streams simultaneously, executing the branch
/// for the future that finishes first. Unlike [`select!`], if multiple futures are ready,
/// one will be selected in order of declaration. Futures directly
/// passed to `select_biased!` must be `Unpin` and implement `FusedFuture`.
///
/// If an expression which yields a `Future` is passed to `select_biased!`
/// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin`
/// requirement is relaxed, since the macro will pin the resulting `Future`
/// on the stack. However the `Future` returned by the expression must
/// still implement `FusedFuture`. This difference is presented
///
/// Futures and streams which are not already fused can be fused using the
/// `.fuse()` method. Note, though, that fusing a future or stream directly
/// in the call to `select_biased!` will not be enough to prevent it from being
/// polled after completion if the `select_biased!` call is in a loop, so when
/// `select_biased!`ing in a loop, users should take care to `fuse()` outside of
/// the loop.
///
/// `select_biased!` can be used as an expression and will return the return
/// value of the selected branch. For this reason the return type of every
/// branch in a `select_biased!` must be the same.
///
/// This macro is only usable inside of async functions, closures, and blocks.
/// It is also gated behind the `async-await` feature of this library, which is
/// _not_ activated by default.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::select_biased;
/// let mut a = future::ready(4);
/// let mut b = future::pending::<()>();
///
/// let res = select_biased! {
/// a_res = a => a_res + 1,
/// _ = b => 0,
/// };
/// assert_eq!(res, 5);
/// # });
/// ```
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, StreamExt};
/// use futures::select_biased;
/// let mut st = stream::iter(vec![2]).fuse();
/// let mut fut = future::pending::<()>();
///
/// select_biased! {
/// x = st.next() => assert_eq!(Some(2), x),
/// _ = fut => panic!(),
/// };
/// # });
/// ```
///
/// As described earlier, `select_biased` can directly select on expressions
/// which return `Future`s - even if those do not implement `Unpin`:
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::FutureExt;
/// use futures::select_biased;
///
/// // Calling the following async fn returns a Future which does not
/// // implement Unpin
/// async fn async_identity_fn(arg: usize) -> usize {
/// arg
/// }
///
/// let res = select_biased! {
/// a_res = async_identity_fn(62).fuse() => a_res + 1,
/// b_res = async_identity_fn(13).fuse() => b_res,
/// };
/// assert!(res == 63 || res == 12);
/// # });
/// ```
///
/// If a similar async function is called outside of `select_biased` to produce
/// a `Future`, the `Future` must be pinned in order to be able to pass
/// it to `select_biased`. This can be achieved via `Box::pin` for pinning a
/// `Future` on the heap or the `pin_mut!` macro for pinning a `Future`
/// on the stack.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::FutureExt;
/// use futures::select_biased;
/// use futures::pin_mut;
///
/// // Calling the following async fn returns a Future which does not
/// // implement Unpin
/// async fn async_identity_fn(arg: usize) -> usize {
/// arg
/// }
///
/// let fut_1 = async_identity_fn(1).fuse();
/// let fut_2 = async_identity_fn(2).fuse();
/// let mut fut_1 = Box::pin(fut_1); // Pins the Future on the heap
/// pin_mut!(fut_2); // Pins the Future on the stack
///
/// let res = select_biased! {
/// a_res = fut_1 => a_res,
/// b_res = fut_2 => b_res,
/// };
/// assert!(res == 1 || res == 2);
/// # });
/// ```
///
/// `select_biased` also accepts a `complete` branch and a `default` branch.
/// `complete` will run if all futures and streams have already been
/// exhausted. `default` will run if no futures or streams are
/// immediately ready. `complete` takes priority over `default` in
/// the case where all futures have completed.
/// A motivating use-case for passing `Future`s by name as well as for
/// `complete` blocks is to call `select_biased!` in a loop, which is
/// demonstrated in the following example:
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::select_biased;
/// let mut a_fut = future::ready(4);
/// let mut b_fut = future::ready(6);
/// let mut total = 0;
///
/// loop {
/// select_biased! {
/// a = a_fut => total += a,
/// b = b_fut => total += b,
/// complete => break,
/// default => panic!(), // never runs (futures run first, then complete)
/// };
/// }
/// assert_eq!(total, 10);
/// # });
/// ```
///
/// Note that the futures that have been matched over can still be mutated
/// from inside the `select_biased!` block's branches. This can be used to implement
/// more complex behavior such as timer resets or writing into the head of
/// a stream.
///
/// [`select!`]: macro.select.html
$select_biased
}
}

document_select_macro! {
#[proc_macro_hack(support_nested)]
pub use futures_macro::select;

#[proc_macro_hack(support_nested)]
pub use futures_macro::select_biased;
}
11 changes: 11 additions & 0 deletions futures/src/lib.rs
Expand Up @@ -554,6 +554,7 @@ pub mod inner_macro {
pub use futures_util::join;
pub use futures_util::try_join;
pub use futures_util::select;
pub use futures_util::select_biased;
}

#[cfg(feature = "std")]
Expand Down Expand Up @@ -592,4 +593,14 @@ futures_util::document_select_macro! {
}
}
}

#[macro_export]
macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path
($($tokens:tt)*) => {
$crate::inner_macro::select_biased! {
futures_crate_path ( ::futures )
$( $tokens )*
}
}
}
}
20 changes: 20 additions & 0 deletions futures/tests/async_await_macros.rs
Expand Up @@ -56,6 +56,26 @@ fn select() {
assert!(ran);
}

#[test]
fn select_biased() {
use futures::select_biased;

let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
let mut ran = false;
block_on(async {
select_biased! {
res = rx1.fuse() => {
assert_eq!(Ok(1), res);
ran = true;
},
_ = rx2.fuse() => unreachable!(),
}
});
assert!(ran);
}

#[test]
fn select_streams() {
let (mut tx1, rx1) = mpsc::channel::<i32>(1);
Expand Down

0 comments on commit d309ea3

Please sign in to comment.