Skip to content

Commit

Permalink
util: implement JoinMap (#4640)
Browse files Browse the repository at this point in the history
## Motivation

In many cases, it is desirable to spawn a set of tasks associated with
keys, with the ability to cancel them by key. As an example use case for
this sort of thing, see Tower's [`ReadyCache` type][1].

Now that PR #4530 adds a way of cancelling tasks in a
`tokio::task::JoinSet`, we can implement a map-like API based on the
same `IdleNotifiedSet` primitive.

## Solution

This PR adds an implementation of a `JoinMap` type to
`tokio_util::task`, using the `JoinSet` type from `tokio::task`, the
`AbortHandle` type added in #4530, and the new task IDs added in #4630.

Individual tasks can be aborted by key using the `JoinMap::abort`
method, and a set of tasks whose key match a given predicate can be
aborted using `JoinMap::abort_matching`.

When tasks complete, `JoinMap::join_one` returns their associated key
alongside the output from the spawned future, or the key and the
`JoinError` if the task did not complete successfully.

Overall, I think the way this works is pretty straightforward; much of
this PR is just API boilerplate to implement the union of applicable
APIs from `JoinSet` and `HashMap`. Unlike previous iterations on the
`JoinMap` API (e.g. #4538), this version is implemented entirely in
`tokio_util`, using only public APIs from the `tokio` crate. Currently,
the required `tokio` APIs are unstable, but implementing `JoinMap` in
`tokio-util` means we will never have to make stability commitments for
the `JoinMap` API itself.

[1]: https://github.com/tower-rs/tower/blob/master/tower/src/ready_cache/cache.rs

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Apr 26, 2022
1 parent 1d3f123 commit d456706
Show file tree
Hide file tree
Showing 9 changed files with 1,101 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .cargo/config
@@ -0,0 +1,2 @@
# [build]
# rustflags = ["--cfg", "tokio_unstable"]
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
tokio = { version = "1.18.0", features = ["full"] }
```
Then, on your main.rs:

Expand Down
8 changes: 5 additions & 3 deletions tokio-util/Cargo.toml
Expand Up @@ -29,13 +29,12 @@ codec = ["tracing"]
time = ["tokio/time","slab"]
io = []
io-util = ["io", "tokio/rt", "tokio/io-util"]
rt = ["tokio/rt", "tokio/sync", "futures-util"]
rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"]

__docs_rs = ["futures-util"]

[dependencies]
tokio = { version = "1.6.0", path = "../tokio", features = ["sync"] }

tokio = { version = "1.18.0", path = "../tokio", features = ["sync"] }
bytes = "1.0.0"
futures-core = "0.3.0"
futures-sink = "0.3.0"
Expand All @@ -45,6 +44,9 @@ pin-project-lite = "0.2.0"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }

[target.'cfg(tokio_unstable)'.dependencies]
hashbrown = { version = "0.12.0", optional = true }

[dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full"] }
tokio-test = { version = "0.4.0", path = "../tokio-test" }
Expand Down

0 comments on commit d456706

Please sign in to comment.