diff --git a/examples/01_02_why_async/Cargo.toml b/examples/01_02_why_async/Cargo.toml index 909ceca1..a7e28706 100644 --- a/examples/01_02_why_async/Cargo.toml +++ b/examples/01_02_why_async/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dev-dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/01_04_async_await_primer/Cargo.toml b/examples/01_04_async_await_primer/Cargo.toml index bcec3ffd..a5c21b0e 100644 --- a/examples/01_04_async_await_primer/Cargo.toml +++ b/examples/01_04_async_await_primer/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dev-dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/01_05_http_server/Cargo.toml b/examples/01_05_http_server/Cargo.toml index 3c2d4b68..d5738d0a 100644 --- a/examples/01_05_http_server/Cargo.toml +++ b/examples/01_05_http_server/Cargo.toml @@ -11,7 +11,7 @@ edition = "2018" # for writing async code. Enable the "compat" feature to include the # functions for using futures 0.3 and async/await with the Hyper library, # which use futures 0.1. -futures-preview = { version = "=0.3.0-alpha.16", features = ["compat"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] } # Hyper is an asynchronous HTTP library. We'll use it to power our HTTP # server and to make HTTP requests. diff --git a/examples/02_03_timer/Cargo.toml b/examples/02_03_timer/Cargo.toml index 40eff29b..57b3548a 100644 --- a/examples/02_03_timer/Cargo.toml +++ b/examples/02_03_timer/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/02_04_executor/Cargo.toml b/examples/02_04_executor/Cargo.toml index 86eb771a..50c548d6 100644 --- a/examples/02_04_executor/Cargo.toml +++ b/examples/02_04_executor/Cargo.toml @@ -7,5 +7,5 @@ edition = "2018" [lib] [dependencies] -futures-preview = { version = "=0.3.0-alpha.16" } +futures-preview = { version = "=0.3.0-alpha.17" } timer_future = { package = "02_03_timer", path = "../02_03_timer" } diff --git a/examples/03_01_async_await/Cargo.toml b/examples/03_01_async_await/Cargo.toml index 8b7fddc9..b32d75aa 100644 --- a/examples/03_01_async_await/Cargo.toml +++ b/examples/03_01_async_await/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dev-dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/05_01_streams/Cargo.toml b/examples/05_01_streams/Cargo.toml index cee725a0..1f36b9ce 100644 --- a/examples/05_01_streams/Cargo.toml +++ b/examples/05_01_streams/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dev-dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/05_02_iteration_and_concurrency/Cargo.toml b/examples/05_02_iteration_and_concurrency/Cargo.toml index cbb82f91..539ea411 100644 --- a/examples/05_02_iteration_and_concurrency/Cargo.toml +++ b/examples/05_02_iteration_and_concurrency/Cargo.toml @@ -7,4 +7,4 @@ edition = "2018" [lib] [dev-dependencies] -futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] } +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/06_02_join/Cargo.toml b/examples/06_02_join/Cargo.toml new file mode 100644 index 00000000..a2492969 --- /dev/null +++ b/examples/06_02_join/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "06_02_join" +version = "0.1.0" +authors = ["Taylor Cramer "] +edition = "2018" + +[lib] + +[dev-dependencies] +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/06_02_join/src/lib.rs b/examples/06_02_join/src/lib.rs new file mode 100644 index 00000000..28867f40 --- /dev/null +++ b/examples/06_02_join/src/lib.rs @@ -0,0 +1,78 @@ +#![cfg(test)] +#![feature(async_await)] + +struct Book; +struct Music; +async fn get_book() -> Book { Book } +async fn get_music() -> Music { Music } + +mod naiive { +use super::*; +// ANCHOR: naiive +async fn get_book_and_music() -> (Book, Music) { + let book = get_book().await; + let music = get_music().await; + (book, music) +} +// ANCHOR_END: naiive +} + +mod other_langs { +use super::*; +// ANCHOR: other_langs +// WRONG -- don't do this +async fn get_book_and_music() -> (Book, Music) { + let book_future = get_book(); + let music_future = get_music(); + (book_future.await, music_future.await) +} +// ANCHOR_END: other_langs +} + +mod join { +use super::*; +// ANCHOR: join +use futures::join; + +async fn get_book_and_music() -> (Book, Music) { + let book_fut = get_book(); + let music_fut = get_music(); + join!(book_fut, music_fut) +} +// ANCHOR_END: join +} + +mod try_join { +use super::{Book, Music}; +// ANCHOR: try_join +use futures::try_join; + +async fn get_book() -> Result { /* ... */ Ok(Book) } +async fn get_music() -> Result { /* ... */ Ok(Music) } + +async fn get_book_and_music() -> Result<(Book, Music), String> { + let book_fut = get_book(); + let music_fut = get_music(); + try_join!(book_fut, music_fut) +} +// ANCHOR_END: try_join +} + +mod mismatched_err { +use super::{Book, Music}; +// ANCHOR: try_join_map_err +use futures::{ + future::TryFutureExt, + try_join, +}; + +async fn get_book() -> Result { /* ... */ Ok(Book) } +async fn get_music() -> Result { /* ... */ Ok(Music) } + +async fn get_book_and_music() -> Result<(Book, Music), String> { + let book_fut = get_book().map_err(|()| "Unable to get book".to_string()); + let music_fut = get_music(); + try_join!(book_fut, music_fut) +} +// ANCHOR_END: try_join_map_err +} diff --git a/examples/06_03_select/Cargo.toml b/examples/06_03_select/Cargo.toml new file mode 100644 index 00000000..609fe433 --- /dev/null +++ b/examples/06_03_select/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "06_03_select" +version = "0.1.0" +authors = ["Taylor Cramer "] +edition = "2018" + +[lib] + +[dev-dependencies] +futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] } diff --git a/examples/06_03_select/src/lib.rs b/examples/06_03_select/src/lib.rs new file mode 100644 index 00000000..84a49c80 --- /dev/null +++ b/examples/06_03_select/src/lib.rs @@ -0,0 +1,184 @@ +#![cfg(test)] +#![feature(async_await)] +#![recursion_limit="128"] + +mod example { +// ANCHOR: example +use futures::{ + future::FutureExt, // for `.fuse()` + pin_mut, + select, +}; + +async fn task_one() { /* ... */ } +async fn task_two() { /* ... */ } + +async fn race_tasks() { + let t1 = task_one().fuse(); + let t2 = task_two().fuse(); + + pin_mut!(t1, t2); + + select! { + () = t1 => println!("task one completed first"), + () = t2 => println!("task two completed first"), + } +} +// ANCHOR_END: example +} + +mod default_and_complete { +// ANCHOR: default_and_complete +use futures::{future, select}; + +async fn count() { + let mut a_fut = future::ready(4); + let mut b_fut = future::ready(6); + let mut total = 0; + + loop { + select! { + a = a_fut => total += a, + b = b_fut => total += b, + complete => break, + default => unreachable!(), // never runs (futures are ready, then complete) + }; + } + assert_eq!(total, 10); +} +// ANCHOR_END: default_and_complete + +#[test] +fn run_count() { + futures::executor::block_on(count()); +} +} + +mod fused_stream { +// ANCHOR: fused_stream +use futures::{ + stream::{Stream, StreamExt, FusedStream}, + select, +}; + +async fn add_two_streams( + mut s1: impl Stream + FusedStream + Unpin, + mut s2: impl Stream + FusedStream + Unpin, +) -> u8 { + let mut total = 0; + + loop { + let item = select! { + x = s1.next() => x, + x = s2.next() => x, + complete => break, + }; + if let Some(next_num) = item { + total += next_num; + } + } + + total +} +// ANCHOR_END: fused_stream +} + +mod fuse_terminated { +// ANCHOR: fuse_terminated +use futures::{ + future::{Fuse, FusedFuture, FutureExt}, + stream::{FusedStream, Stream, StreamExt}, + pin_mut, + select, +}; + +async fn get_new_num() -> u8 { /* ... */ 5 } + +async fn run_on_new_num(_: u8) { /* ... */ } + +async fn run_loop( + mut interval_timer: impl Stream + FusedStream + Unpin, + starting_num: u8, +) { + let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); + let get_new_num_fut = Fuse::terminated(); + pin_mut!(run_on_new_num_fut, get_new_num_fut); + loop { + select! { + () = interval_timer.select_next_some() => { + // The timer has elapsed. Start a new `get_new_num_fut` + // if one was not already running. + if get_new_num_fut.is_terminated() { + get_new_num_fut.set(get_new_num().fuse()); + } + }, + new_num = get_new_num_fut => { + // A new number has arrived-- start a new `run_on_new_num_fut`, + // dropping the old one. + run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); + }, + // Run the `run_on_new_num_fut` + () = run_on_new_num_fut => {}, + // panic if everything completed, since the `interval_timer` should + // keep yielding values indefinitely. + complete => panic!("`interval_timer` completed unexpectedly"), + } + } +} +// ANCHOR_END: fuse_terminated +} + +mod futures_unordered { +// ANCHOR: futures_unordered +use futures::{ + future::{Fuse, FusedFuture, FutureExt}, + stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, + pin_mut, + select, +}; + +async fn get_new_num() -> u8 { /* ... */ 5 } + +async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } + +// Runs `run_on_new_num` with the latest number +// retrieved from `get_new_num`. +// +// `get_new_num` is re-run every time a timer elapses, +// immediately cancelling the currently running +// `run_on_new_num` and replacing it with the newly +// returned value. +async fn run_loop( + mut interval_timer: impl Stream + FusedStream + Unpin, + starting_num: u8, +) { + let mut run_on_new_num_futs = FuturesUnordered::new(); + run_on_new_num_futs.push(run_on_new_num(starting_num)); + let get_new_num_fut = Fuse::terminated(); + pin_mut!(get_new_num_fut); + loop { + select! { + () = interval_timer.select_next_some() => { + // The timer has elapsed. Start a new `get_new_num_fut` + // if one was not already running. + if get_new_num_fut.is_terminated() { + get_new_num_fut.set(get_new_num().fuse()); + } + }, + new_num = get_new_num_fut => { + // A new number has arrived-- start a new `run_on_new_num_fut`. + run_on_new_num_futs.push(run_on_new_num(new_num)); + }, + // Run the `run_on_new_num_futs` and check if any have completed + res = run_on_new_num_futs.select_next_some() => { + println!("run_on_new_num_fut returned {:?}", res); + }, + // panic if everything completed, since the `interval_timer` should + // keep yielding values indefinitely. + complete => panic!("`interval_timer` completed unexpectedly"), + } + } +} + +// ANCHOR_END: futures_unordered +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 12bcf81a..230ab300 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -9,4 +9,6 @@ members = [ "03_01_async_await", "05_01_streams", "05_02_iteration_and_concurrency", + "06_02_join", + "06_03_select", ] diff --git a/src/02_execution/04_executor.md b/src/02_execution/04_executor.md index ff5e65dc..29b70037 100644 --- a/src/02_execution/04_executor.md +++ b/src/02_execution/04_executor.md @@ -27,7 +27,7 @@ authors = ["XYZ Author"] edition = "2018" [dependencies] -futures-preview = "=0.3.0-alpha.16" +futures-preview = "=0.3.0-alpha.17" ``` Next, we need the following imports at the top of `src/main.rs`: diff --git a/src/02_execution/05_io.md b/src/02_execution/05_io.md index 577e5d2e..d28f65b9 100644 --- a/src/02_execution/05_io.md +++ b/src/02_execution/05_io.md @@ -109,4 +109,4 @@ task, allowing the executor to drive more tasks to completion before returning to check for more IO events (and the cycle continues...). [The `Future` Trait]: ./02_future.md -[`mio`]: https://github.com/carllerche/mio +[`mio`]: https://github.com/tokio-rs/mio diff --git a/src/06_multiple_futures/01_chapter.md b/src/06_multiple_futures/01_chapter.md new file mode 100644 index 00000000..4295b210 --- /dev/null +++ b/src/06_multiple_futures/01_chapter.md @@ -0,0 +1,14 @@ +# Executing Multiple Futures at a Time + +Up until now, we've mostly executed futures by using `.await`, which blocks +the current task until a particular `Future` completes. However, real +asynchronous applications often need to execute several different +operations concurrently. + +In this chapter, we'll cover some ways to execute multiple asynchronous +operations at the same time: + +- `join!`: waits for futures to all complete +- `select!`: waits for one of several futures to complete +- Spawning: creates a top-level task which ambiently runs a future to completion +- `FuturesUnordered`: a group of futures which yields the result of each subfuture diff --git a/src/06_multiple_futures/02_join.md b/src/06_multiple_futures/02_join.md new file mode 100644 index 00000000..8c6e07ab --- /dev/null +++ b/src/06_multiple_futures/02_join.md @@ -0,0 +1,56 @@ +# `join!` + +The `futures::join` macro makes it possible to wait for multiple different +futures to complete while executing them all concurrently. + +When performing multiple asynchronous operations, it's tempting to simply +`.await` them in a series: + +```rust +{{#include ../../examples/06_02_join/src/lib.rs:naiive}} +``` + +However, this will be slower than necessary, since it won't start trying to +`get_music` until after `get_book` has completed. In some other languages, +futures are ambiently run to completion, so two operations can be +run concurrently by first calling the each `async fn` to start the futures, +and then awaiting them both: + +```rust +{{#include ../../examples/06_02_join/src/lib.rs:other_langs}} +``` + +However, Rust futures won't do any work until they're actively `.await`ed. +This means that the two code snippets above will both run +`book_future` and `music_future` in series rather than running them +concurrently. To correctly run the two futures concurrently, use +`futures::join!`: + +```rust +{{#include ../../examples/06_02_join/src/lib.rs:join}} +``` + +The value returned by `join!` is a tuple containing the output of each +`Future` passed in. + +## `try_join!` + +For futures which return `Result`, consider using `try_join!` rather than +`join!`. Since `join!` only completes once all subfutures have completed, +it'll continue processing other futures even after one of its subfutures +has returned an `Err`. + +Unlike `join!`, `try_join!` will complete immediately if one of the subfutures +returns an error. + +```rust +{{#include ../../examples/06_02_join/src/lib.rs:try_join}} +``` + +Note that the futures passed to `try_join!` must all have the same error type. +Consider using the `.map_err(|e| ...)` and `.err_into()` functions from +`futures::future::TryFutureExt` to consolidate the error types: + +```rust +{{#include ../../examples/06_02_join/src/lib.rs:try_join_map_err}} +``` diff --git a/src/06_multiple_futures/03_select.md b/src/06_multiple_futures/03_select.md new file mode 100644 index 00000000..64b1e9ef --- /dev/null +++ b/src/06_multiple_futures/03_select.md @@ -0,0 +1,90 @@ +# `select!` + +The `futures::select` macro runs multiple futures simultaneously, allowing +the user to respond as soon as any future completes. + +```rust +{{#include ../../examples/06_03_select/src/lib.rs:example}} +``` + +The function above will run both `t1` and `t2` concurrently. When either +`t1` or `t2` finishes, the corresponding handler will call `println!`, and +the function will end without completing the remaining task. + +The basic syntax for `select` is ` = => ,`, +repeated for as many futures as you would like to `select` over. + +## `default => ...` and `complete => ...` + +`select` also supports `default` and `complete` branches. + +A `default` branch will run if none of the futures being `select`ed +over are yet complete. A `select` with a `default` branch will +therefore always return immediately, since `default` will be run +if none of the other futures are ready. + +`complete` branches can be used to handle the case where all futures +being `select`ed over have completed and will no longer make progress. +This is often handy when looping over a `select!`. + +```rust +{{#include ../../examples/06_03_select/src/lib.rs:default_and_complete}} +``` + +## Interaction with `Unpin` and `FusedFuture` + +One thing you may have noticed in the first example above is that we +had to call `.fuse()` on the futures returned by the two `async fn`s, +as well as pinning them with `pin_mut`. Both of these calls are necessary +because the futures used in `select` must implement both the `Unpin` +trait and the `FusedFuture` trait. + +`Unpin` is necessary because the futures used by `select` are not +taken by value, but by mutable reference. By not taking ownership +of the future, uncompleted futures can be used again after the +call to `select`. + +Similarly, the `FusedFuture` trait is required because `select` must +not poll a future after it has completed. `FusedFuture` is implemented +by futures which track whether or not they have completed. This makes +it possible to use `select` in a loop, only polling the futures which +still have yet to complete. This can be seen in the example above, +where `a_fut` or `b_fut` will have completed the second time through +the loop. Because the future returned by `future::ready` implements +`FusedFuture`, it's able to tell `select` not to poll it again. + +Note that streams have a corresponding `FusedStream` trait. Streams +which implement this trait or have been wrapped using `.fuse()` +`will yield `FusedFuture` futures from their +`.next()` / `.try_next()` combinators. + +```rust +{{#include ../../examples/06_03_select/src/lib.rs:fused_stream}} +``` + +## Concurrent tasks in a `select` loop with `Fuse` and `FuturesUnordered` + +One somewhat hard-to-discover but handy function is `Fuse::terminated()`, +which allows constructing an empty future which is already terminated, +and can later be filled in with a future that needs to be run. + +This can be handy when there's a task that needs to be run during a `select` +loop but which is created inside the `select` loop itself. + +Note the use of the `.select_next_some()` function. This can be +used with `select` to only run the branch for `Some(_)` values +returned from the stream, ignoring `None`s. + +```rust +{{#include ../../examples/06_03_select/src/lib.rs:fuse_terminated}} +``` + +When many copies of the same future need to be run simultaneously, +use the `FuturesUnordered` type. The following example is similar +to the one above, but will run each copy of `run_on_new_num_fut` +to completion, rather than aborting them when a new one is created. +It will also print out a value returned by `run_on_new_num_fut`. + +```rust +{{#include ../../examples/06_03_select/src/lib.rs:futures_unordered}} +``` diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 905c3c91..9f4a2388 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -14,8 +14,9 @@ - [Pinning](04_pinning/01_chapter.md) - [Streams](05_streams/01_chapter.md) - [Iteration and Concurrency](05_streams/02_iteration_and_concurrency.md) -- [TODO: Executing Multiple Futures at a Time](404.md) - - [TODO: `join!` and `select!`](404.md) +- [Executing Multiple Futures at a Time](06_multiple_futures/01_chapter.md) + - [`join!`](06_multiple_futures/02_join.md) + - [`select!`](06_multiple_futures/03_select.md) - [TODO: Spawning](404.md) - [TODO: Cancellation and Timeouts](404.md) - [TODO: `FuturesUnordered`](404.md)