Skip to content

Commit

Permalink
Add sections for select and join
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Aug 6, 2019
1 parent 806320d commit 6107fa7
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 2 deletions.
10 changes: 10 additions & 0 deletions examples/06_02_join/Cargo.toml
@@ -0,0 +1,10 @@
[package]
name = "06_02_join"
version = "0.1.0"
authors = ["Taylor Cramer <cramertj@google.com>"]
edition = "2018"

[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
78 changes: 78 additions & 0 deletions examples/06_02_join/src/lib.rs
@@ -0,0 +1,78 @@
#![cfg(test)]
#![feature(async_await)]

struct Book;
struct Music;
async fn get_book() -> Book { Book }
async fn get_music() -> Music { Music }

mod naiive {
use super::*;
// ANCHOR: naiive
async fn get_book_and_music() -> (Book, Music) {
let book = get_book().await;
let music = get_music().await;
(book, music)
}
// ANCHOR_END: naiive
}

mod other_langs {
use super::*;
// ANCHOR: other_langs
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
let book_future = get_book();
let music_future = get_music();
(book_future.await, music_future.await)
}
// ANCHOR_END: other_langs
}

mod join {
use super::*;
// ANCHOR: join
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
let book_fut = get_book();
let music_fut = get_music();
join!(book_fut, music_fut)
}
// ANCHOR_END: join
}

mod try_join {
use super::{Book, Music};
// ANCHOR: try_join
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
// ANCHOR_END: try_join
}

mod mismatched_err {
use super::{Book, Music};
// ANCHOR: try_join_map_err
use futures::{
future::TryFutureExt,
try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
// ANCHOR_END: try_join_map_err
}
10 changes: 10 additions & 0 deletions examples/06_03_select/Cargo.toml
@@ -0,0 +1,10 @@
[package]
name = "06_03_select"
version = "0.1.0"
authors = ["Taylor Cramer <cramertj@google.com>"]
edition = "2018"

[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
183 changes: 183 additions & 0 deletions examples/06_03_select/src/lib.rs
@@ -0,0 +1,183 @@
#![cfg(test)]
#![feature(async_await)]

mod example {
// ANCHOR: example
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();

pin_mut!(t1, t2);

select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
// ANCHOR_END: example
}

mod default_and_complete {
// ANCHOR: default_and_complete
use futures::{future, select};

async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;

loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // never runs (futures are ready, then complete)
};
}
assert_eq!(total, 10);
}
// ANCHOR_END: default_and_complete

#[test]
fn run_count() {
futures::executor::block_on(count());
}
}

mod fused_stream {
// ANCHOR: fused_stream
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};

async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;

loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}

total
}
// ANCHOR_END: fused_stream
}

mod fuse_terminated {
// ANCHOR: fuse_terminated
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived-- start a new `run_on_new_num_fut`,
// dropping the old one.
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// Run the `run_on_new_num_fut`
() = run_on_new_num_fut => {},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
// ANCHOR_END: fuse_terminated
}

mod futures_unordered {
// ANCHOR: futures_unordered
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived-- start a new `run_on_new_num_fut`.
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// Run the `run_on_new_num_futs` and check if any have completed
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}

// ANCHOR_END: futures_unordered
}
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Expand Up @@ -9,4 +9,6 @@ members = [
"03_01_async_await",
"05_01_streams",
"05_02_iteration_and_concurrency",
"06_02_join",
"06_03_select",
]
14 changes: 14 additions & 0 deletions src/06_multiple_futures/01_chapter.md
@@ -0,0 +1,14 @@
# Executing Multiple Futures at a Time

Up until now, we've mostly executed futures by using `.await`, which blocks
the current task until a particular `Future` completes. However, real
asynchronous applications often need to execute several different
operations concurrently.

In this chapter, we'll cover some ways to execute multiple asynchronous
operations at the same time:

- `join!`: waits for futures to all complete
- `select!`: waits for one of several futures to complete
- Spawning: creates a top-level task which ambiently runs a future to completion
- `FuturesUnordered`: a group of futures which yields the result of each subfuture
56 changes: 56 additions & 0 deletions src/06_multiple_futures/02_join.md
@@ -0,0 +1,56 @@
# `join!`

The `futures::join` macro makes it possible to wait for multiple different
futures to complete while executing them all concurrently.

When performing multiple asynchronous operations, it's tempting to simply
`.await` them in a series:

```rust
{{#include ../../examples/06_02_join/src/lib.rs:naiive}}
```

However, this will be slower than necessary, since it won't start trying to
`get_music` until after `get_book` has completed. In some other languages,
futures are ambiently run to completion, so two operations can be
run concurrently by first calling the each `async fn` to start the futures,
and then awaiting them both:

```rust
{{#include ../../examples/06_02_join/src/lib.rs:other_langs}}
```

However, Rust futures won't do any work until they're actively `.await`ed.
This means that the two code snippets above will both run
`book_future` and `music_future` in series rather than running them
concurrently. To correctly run the two futures concurrently, use
`futures::join!`:

```rust
{{#include ../../examples/06_02_join/src/lib.rs:join}}
```

The value returned by `join!` is a tuple containing the output of each
`Future` passed in.

## `try_join!`

For futures which return `Result`, consider using `try_join!` rather than
`join!`. Since `join!` only completes once all subfutures have completed,
it'll continue processing other futures even after one of its subfutures
has returned an `Err`.

Unlike `join!`, `try_join!` will complete immediately if one of the subfutures
returns an error.

```rust
{{#include ../../examples/06_02_join/src/lib.rs:try_join}}
```

Note that the futures passed to `try_join!` must all have the same error type.
Consider using the `.map_err(|e| ...)` and `.err_into()` functions from
`futures::future::TryFutureExt` to consolidate the error types:

```rust
{{#include ../../examples/06_02_join/src/lib.rs:try_join_map_err}}
```

0 comments on commit 6107fa7

Please sign in to comment.