From ab60c6992346739aeb0e2033d8b0bfc8539d3702 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 12:35:22 -0700 Subject: [PATCH 01/21] WIP task-id-based joinmap Signed-off-by: Eliza Weisman --- tokio-util/Cargo.toml | 6 +- tokio-util/src/task/join_map.rs | 723 ++++++++++++++++++++++++++++++++ tokio-util/src/task/mod.rs | 6 + tokio/Cargo.toml | 2 +- 4 files changed, 733 insertions(+), 4 deletions(-) create mode 100644 tokio-util/src/task/join_map.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index b4557586a0f..8b2dcdc1c77 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -29,13 +29,13 @@ codec = ["tracing"] time = ["tokio/time","slab"] io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] -rt = ["tokio/rt", "tokio/sync", "futures-util"] +rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] __docs_rs = ["futures-util"] [dependencies] -tokio = { version = "1.6.0", path = "../tokio", features = ["sync"] } - +tokio = { version = "1.18.0", path = "../tokio", features = ["sync"] } +hashbrown = { version = "0.12.0", optional = true } bytes = "1.0.0" futures-core = "0.3.0" futures-sink = "0.3.0" diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs new file mode 100644 index 00000000000..f96ec3c5126 --- /dev/null +++ b/tokio-util/src/task/join_map.rs @@ -0,0 +1,723 @@ +use tokio::runtime::Handle; +use tokio::task::{JoinError, AbortHandle, LocalSet, JoinSet, Id}; +use std::borrow::Borrow; +use std::collections::hash_map::RandomState; +use hashbrown::HashMap; +use hashbrown::hash_map::RawEntryMut; +use std::future::Future; +use std::hash::{BuildHasher, Hash, Hasher}; + +/// A collection of tasks spawned on a Tokio runtime, associated with hash map +/// keys. +/// +/// This type is very similar to the [`JoinSet`] type, with the addition of a +/// set of keys associated with each task. These keys allow [cancelling a +/// task][abort] or [multiple tasks][abort_matching] in the `JoinMap` based on +/// their keys, or [test whether a task corresponding to a given key exists][contains] in the `JoinMap`. +/// +/// In addition, when tasks in the `JoinMap` complete, they will return the +/// associated key along with the value returned by the task, if any. +/// +/// A `JoinMap` can be used to await the completion of some or all of the tasks +/// in the map. The map is not ordered, and the tasks will be returned in the +/// order they complete. +/// +/// All of the tasks must have the same return type `V`. +/// +/// When the `JoinMap` is dropped, all tasks in the `JoinMap` are immediately aborted. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// # Examples +/// +/// Spawn multiple tasks and wait for them: +/// +/// ``` +/// use tokio::task::JoinMap; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut map = JoinMap::new(); +/// +/// for i in 0..10 { +/// // Spawn a task on the `JoinMap` with `i` as its key. +/// map.spawn(i, async move { /* ... */ }); +/// } +/// +/// let mut seen = [false; 10]; +/// +/// // When a task completes, `join_one` returns the task's key along +/// // with its output. +/// while let Some((key, res)) = map.join_one().await { +/// seen[key] = true; +/// assert!(res.is_ok(), "task {} completed successfully!", key); +/// } +/// +/// for i in 0..10 { +/// assert!(seen[i]); +/// } +/// } +/// ``` +/// +/// Cancel tasks based on their keys: +/// +/// ``` +/// use tokio::task::JoinMap; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut map = JoinMap::new(); +/// +/// map.spawn("hello world", async move { /* ... */ }); +/// map.spawn("goodbye world", async move { /* ... */}); +/// +/// // Look up the "goodbye world" task in the map and abort it. +/// let aborted = map.abort("goodbye world"); +/// +/// // `JoinMap::abort` returns `true` if a task existed for the +/// // provided key. +/// assert!(aborted); +/// +/// while let Some((key, res)) = map.join_one().await { +/// if key == "goodbye world" { +/// // The aborted task should complete with a cancelled `JoinError`. +/// assert!(res.unwrap_err().is_cancelled()); +/// } else { +/// // Other tasks should complete normally. +/// assert!(res.is_ok()); +/// } +/// } +/// } +/// ``` +/// +/// [`JoinSet`]: crate::task::JoinSet +/// [unstable]: crate#unstable-features +/// [abort]: fn@Self::abort +/// [abort_matching]: fn@Self::abort_matching +/// [contains]: fn@Self::contains_task +#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[derive(Debug)] +pub struct JoinMap { + tasks_by_key: HashMap, AbortHandle, S>, + hashes_by_task: HashMap, + + /// The set of tasks spawned on the `JoinMap`. + /// + /// Each `IdleNotifiedSet` entry contains the hash of the task's key, to + /// allow looking the key up when the task completes. + task_set: JoinSet, +} + +/// A `JoinMap` key. +#[derive(Debug, Hash, PartialEq, Eq)] +struct Key { + key: K, + id: Id, +} + +impl JoinMap { + /// Creates a new empty `JoinMap`. + /// + /// The `JoinMap` is initially created with a capacity of 0, so it will not + /// allocate until a task is first spawned on it. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinMap; + /// let map: JoinMap<&str, i32> = JoinMap::new(); + /// ``` + #[inline] + #[must_use] + pub fn new() -> Self { + Self::with_hasher(RandomState::new()) + } + + /// Creates an empty `JoinMap` with the specified capacity. + /// + /// The `JoinMap` will be able to hold at least `capacity` tasks without + /// reallocating. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinMap; + /// let map: JoinMap<&str, i32> = JoinMap::with_capacity(10); + /// ``` + #[inline] + #[must_use] + pub fn with_capacity(capacity: usize) -> Self { + JoinMap::with_capacity_and_hasher(capacity, Default::default()) + } +} + +impl JoinMap { + /// Creates an empty `JoinMap` which will use the given hash builder to hash + /// keys. + /// + /// The created map has the default initial capacity. + /// + /// Warning: `hash_builder` is normally randomly generated, and + /// is designed to allow `JoinMap` to be resistant to attacks that + /// cause many collisions and very poor performance. Setting it + /// manually using this function can expose a DoS attack vector. + /// + /// The `hash_builder` passed should implement the [`BuildHasher`] trait for + /// the `JoinMap` to be useful, see its documentation for details. + #[inline] + #[must_use] + pub fn with_hasher(hash_builder: S) -> Self { + Self::with_capacity_and_hasher(0, hash_builder) + } + + /// Creates an empty `JoinMap` with the specified capacity, using `hash_builder` + /// to hash the keys. + /// + /// The `JoinMap` will be able to hold at least `capacity` elements without + /// reallocating. If `capacity` is 0, the `JoinMap` will not allocate. + /// + /// Warning: `hash_builder` is normally randomly generated, and + /// is designed to allow HashMaps to be resistant to attacks that + /// cause many collisions and very poor performance. Setting it + /// manually using this function can expose a DoS attack vector. + /// + /// The `hash_builder` passed should implement the [`BuildHasher`] trait for + /// the `JoinMap`to be useful, see its documentation for details. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::task::JoinMap; + /// use std::collections::hash_map::RandomState; + /// + /// let s = RandomState::new(); + /// let mut map = JoinMap::with_capacity_and_hasher(10, s); + /// map.spawn(1, async move { "hello world!" }); + /// # } + /// ``` + #[inline] + #[must_use] + pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> Self { + Self { + tasks_by_key: HashMap::with_capacity_and_hasher(capacity, hash_builder.clone()), + hashes_by_task: HashMap::with_capacity_and_hasher(capacity, hash_builder), + task_set: JoinSet::new(), + } + } + + /// Returns the number of tasks currently in the `JoinMap`. + pub fn len(&self) -> usize { + let len = self.tasks_by_key.len(); + debug_assert_eq!(len, self.hashes_by_task.len()); + len + } + + /// Returns whether the `JoinMap` is empty. + pub fn is_empty(&self) -> bool { + let empty = self.tasks_by_key.is_empty(); + debug_assert_eq!(empty, self.hashes_by_task.is_empty()); + empty + } + + /// Returns the number of tasks the map can hold without reallocating. + /// + /// This number is a lower bound; the `JoinMap` might be able to hold + /// more, but is guaranteed to be able to hold at least this many. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinMap; + /// + /// let map: JoinMap = JoinMap::with_capacity(100); + /// assert!(map.capacity() >= 100); + /// ``` + #[inline] + pub fn capacity(&self) -> usize { + let capacity = self.tasks_by_key.capacity(); + debug_assert_eq!(capacity, self.hashes_by_task.capacity()); + capacity + } +} + +impl JoinMap +where + K: Hash + Eq + 'static, + V: 'static, + S: BuildHasher, +{ + /// Spawn the provided task and store it in this `JoinMap` with the provided + /// key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_one`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`join_one`]: Self::join_one + pub fn spawn(&mut self, key: K, task: F) + where + F: Future, + F: Send + 'static, + V: Send, + { + let task = self.task_set.spawn(task); + self.insert(key, task) + } + + /// Spawn the provided task on the provided runtime and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_one`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// [`join_one`]: Self::join_one + pub fn spawn_on(&mut self, key: K, task: F, handle: &Handle) + where + F: Future, + F: Send + 'static, + V: Send, + { + let task = self.task_set.spawn_on(task, handle); + self.insert(key, task); + } + + /// Spawn the provided task on the current [`LocalSet`] and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_one`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// # Panics + /// + /// This method panics if it is called outside of a `LocalSet`. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`join_one`]: Self::join_one + pub fn spawn_local(&mut self, key: K, task: F) + where + F: Future, + F: 'static, + { + let task = self.task_set.spawn_local(task); + self.insert(key, task); + } + + /// Spawn the provided task on the provided [`LocalSet`] and store it in + /// this `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_one`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`join_one`]: Self::join_one + pub fn spawn_local_on(&mut self, key: K, task: F, local_set: &LocalSet) + where + F: Future, + F: 'static, + { + let task = self.task_set.spawn_local_on(task, local_set); + self.insert(key, task) + } + + fn insert(&mut self, key: K, abort: AbortHandle) { + let hash = self.hash(&key); + let id = abort.id(); + let map_key = Key { id: id.clone(), key }; + let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key == map_key.key); + match entry { + RawEntryMut::Occupied(mut occ) => { + let Key { id: prev_id, .. } = occ.insert_key(map_key); + occ.insert(abort).abort(); + let _prev_hash = self.hashes_by_task.remove(&prev_id); + debug_assert_eq!(Some(hash), _prev_hash); + }, + RawEntryMut::Vacant(vac) => { + vac.insert(map_key, abort); + } + }; + + let _prev = self.hashes_by_task.insert(id, hash); + debug_assert!(_prev.is_none(), "no prior task should have had the same ID"); + } + + /// Waits until one of the tasks in the map completes and returns its + /// output, along with the key corresponding to that task. + /// + /// Returns `None` if the map is empty. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinMap`. + /// + /// # Returns + /// + /// This function returns: + /// + /// * `Some((key, Ok(value)))` if one of the tasks in this `JoinMap` has + /// completed. The `value` is the return value of that ask, and `key` is + /// the key associated with the task. + /// * `Some((key, Err(err))` if one of the tasks in this JoinMap` has + /// panicked or been aborted. `key` is the key associated with the task + /// that panicked or was aborted. + /// * `None` if the `JoinMap` is empty. + pub async fn join_one(&mut self) -> Option<(K, Result)> { + // crate::future::poll_fn(|cx| self.poll_join_one(cx)).await + todo!("eliza") + } + + /// Aborts all tasks and waits for them to finish shutting down. + /// + /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_one`] in + /// a loop until it returns `None`. + /// + /// This method ignores any panics in the tasks shutting down. When this call returns, the + /// `JoinMap` will be empty. + /// + /// [`abort_all`]: fn@Self::abort_all + /// [`join_one`]: fn@Self::join_one + pub async fn shutdown(&mut self) { + self.abort_all(); + while self.join_one().await.is_some() {} + } + + /// Abort the task corresponding to the provided `key`. + /// + /// If this `JoinMap` contains a task corresponding to `key`, this method + /// will abort that task and return `true`. Otherwise, if no task exists for + /// `key`, this method returns `false`. + /// + /// # Examples + /// + /// Aborting a task by key: + /// + /// ``` + /// use tokio::task::JoinMap; + /// + /// # #[tokio::main] + /// # async fn main() { + /// let mut map = JoinMap::new(); + /// + /// map.spawn("hello world", async move { /* ... */ }); + /// map.spawn("goodbye world", async move { /* ... */}); + /// + /// // Look up the "goodbye world" task in the map and abort it. + /// map.abort("goodbye world"); + /// + /// while let Some((key, res)) = map.join_one().await { + /// if key == "goodbye world" { + /// // The aborted task should complete with a cancelled `JoinError`. + /// assert!(res.unwrap_err().is_cancelled()); + /// } else { + /// // Other tasks should complete normally. + /// assert!(res.is_ok()); + /// } + /// } + /// # } + /// ``` + /// + /// `abort` returns `true` if a task was aborted: + /// ``` + /// use tokio::task::JoinMap; + /// + /// # #[tokio::main] + /// # async fn main() { + /// let mut map = JoinMap::new(); + /// + /// map.spawn("hello world", async move { /* ... */ }); + /// map.spawn("goodbye world", async move { /* ... */}); + /// + /// // A task for the key "goodbye world" should exist in the map: + /// assert!(map.abort("goodbye world")); + /// + /// // Aborting a key that does not exist will return `false`: + /// assert!(!map.abort("goodbye universe")); + /// # } + /// ``` + pub fn abort(&mut self, key: &Q) -> bool + where + Q: Hash + Eq, + K: Borrow, + { + match self.remove_by_key(key) { + Some(handle) => { handle.abort(); true } + None => false, + } + } + + /// Aborts all tasks with keys matching `predicate`. + /// + /// `predicate` is a function called with a reference to each key in the + /// map. If it returns `true` for a given key, the corresponding task will + /// be cancelled. + /// + /// # Examples + /// ``` + /// use tokio::task::JoinMap; + /// + /// # // use the current thread rt so that spawned tasks don't + /// # // complete in the background before they can be aborted. + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() { + /// let mut map = JoinMap::new(); + /// + /// map.spawn("hello world", async move { + /// // ... + /// # tokio::task::yield_now().await; // don't complete immediately, get aborted! + /// }); + /// map.spawn("goodbye world", async move { + /// // ... + /// # tokio::task::yield_now().await; // don't complete immediately, get aborted! + /// }); + /// map.spawn("hello san francisco", async move { + /// // ... + /// # tokio::task::yield_now().await; // don't complete immediately, get aborted! + /// }); + /// map.spawn("goodbye universe", async move { + /// // ... + /// # tokio::task::yield_now().await; // don't complete immediately, get aborted! + /// }); + /// + /// // Abort all tasks whose keys begin with "goodbye" + /// map.abort_matching(|key| key.starts_with("goodbye")); + /// + /// let mut seen = 0; + /// while let Some((key, res)) = map.join_one().await { + /// seen += 1; + /// if key.starts_with("goodbye") { + /// // The aborted task should complete with a cancelled `JoinError`. + /// assert!(res.unwrap_err().is_cancelled()); + /// } else { + /// // Other tasks should complete normally. + /// assert!(key.starts_with("hello")); + /// assert!(res.is_ok()); + /// } + /// } + /// + /// // All spawned tasks should have completed. + /// assert_eq!(seen, 4); + /// # } + /// ``` + pub fn abort_matching(&mut self, mut predicate: impl FnMut(&K) -> bool) { + // let key_set = &mut self.key_set; + // let joins = &mut self.task_set; + // // Note: this method iterates over the key set *without* removing any + // // entries, so that the keys from aborted tasks can be returned when + // // polling the `JoinMap`. + // for entry in key_set.iter().filter(|entry| predicate(&entry.key)) { + // if let Some(mut entry) = joins.entry_mut(entry.task.entry.clone()) { + // entry.with_value_and_context(|(_, jh), _| jh.abort()); + // } + // } + todo!("eliza") + } + + /// Returns `true` if this `JoinMap` contains a task for the provided key. + /// + /// If the task has completed, but its output hasn't yet been consumed by a + /// call to [`join_one`], this method will still return `true`. + /// + /// [`join_one`]: fn@Self::join_one + pub fn contains_key(&self, key: &Q) -> bool + where + Q: Hash + Eq, + K: Borrow, + { + self.get_by_key(key).is_some() + } + + pub fn contains_task(&self, task: &Id) -> bool { + self.get_by_id(task).is_some() + } + + /// Reserves capacity for at least `additional` more tasks to be spawned + /// on this `JoinMap` without reallocating for the map of task keys. The + /// collection may reserve more space to avoid frequent reallocations. + /// + /// Note that spawning a task will still cause an allocation for the task + /// itself. + /// + /// # Panics + /// + /// Panics if the new allocation size overflows [`usize`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinMap; + /// + /// let mut map: JoinMap<&str, i32> = JoinMap::new(); + /// map.reserve(10); + /// ``` + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.tasks_by_key.reserve(additional); + self.hashes_by_task.reserve(additional); + } + + /// Shrinks the capacity of the `JoinMap` as much as possible. It will drop + /// down as much as possible while maintaining the internal rules + /// and possibly leaving some space in accordance with the resize policy. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::task::JoinMap; + /// + /// let mut map: JoinMap = JoinMap::with_capacity(100); + /// map.spawn(1, async move { 2 }); + /// map.spawn(3, async move { 4 }); + /// assert!(map.capacity() >= 100); + /// map.shrink_to_fit(); + /// assert!(map.capacity() >= 2); + /// # } + /// ``` + #[inline] + pub fn shrink_to_fit(&mut self) { + self.hashes_by_task.shrink_to_fit(); + self.tasks_by_key.shrink_to_fit(); + } + + /// Shrinks the capacity of the map with a lower limit. It will drop + /// down no lower than the supplied limit while maintaining the internal rules + /// and possibly leaving some space in accordance with the resize policy. + /// + /// If the current capacity is less than the lower limit, this is a no-op. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::task::JoinMap; + /// + /// let mut map: JoinMap = JoinMap::with_capacity(100); + /// map.spawn(1, async move { 2 }); + /// map.spawn(3, async move { 4 }); + /// assert!(map.capacity() >= 100); + /// map.shrink_to(10); + /// assert!(map.capacity() >= 10); + /// map.shrink_to(0); + /// assert!(map.capacity() >= 2); + /// # } + /// ``` + #[inline] + pub fn shrink_to(&mut self, min_capacity: usize) { + self.hashes_by_task.shrink_to(min_capacity); + self.tasks_by_key.shrink_to(min_capacity) + } + + fn get_by_key<'map, Q: ?Sized>(&'map self, key: &Q) -> Option<(&'map Key, &'map AbortHandle)> + where + Q: Hash + Eq, + K: Borrow, + { + let hash = self.hash(key); + self.tasks_by_key.raw_entry().from_hash(hash, |k| k.key.borrow() == key) + } + + fn get_by_id<'map>(&'map self, id: &Id) -> Option<(&'map Key, &'map AbortHandle)> { + let hash = self.hashes_by_task.get(id)?; + self.tasks_by_key.raw_entry().from_hash(*hash, |k| &k.id == id) + } + + fn remove_by_id(&mut self, id: Id) -> Option { + let hash = self.hashes_by_task.remove(&id)?; + let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.id == id); + let handle = match entry { + RawEntryMut::Occupied(entry) => entry.remove(), + _ => return None, + }; + self.hashes_by_task.remove(&handle.id()); + Some(handle) + } + + fn remove_by_key(&mut self, key: &Q) -> Option + where + Q: Hash + Eq, + K: Borrow, + { + let hash = self.hash(key); + let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key.borrow() == key); + let handle = match entry { + RawEntryMut::Occupied(mut entry) => entry.remove(), + _ => return None, + }; + self.hashes_by_task.remove(&handle.id()); + Some(handle) + } + + fn hash(&self, key: &Q) -> u64 + where + Q: Hash, + { + let mut hasher = self.tasks_by_key.hasher().build_hasher(); + key.hash(&mut hasher); + hasher.finish() + } +} + +impl JoinMap +where + K: 'static, + V: 'static, +{ + /// Aborts all tasks on this `JoinMap`. + /// + /// This does not remove the tasks from the `JoinMap`. To wait for the tasks to complete + /// cancellation, you should call `join_one` in a loop until the `JoinMap` is empty. + pub fn abort_all(&mut self) { + self.task_set.abort_all() + } + + /// Removes all tasks from this `JoinMap` without aborting them. + /// + /// The tasks removed by this call will continue to run in the background even if the `JoinMap` + /// is dropped. They may still be aborted by key. + pub fn detach_all(&mut self) { + self.task_set.detach_all() + } +} + + +// impl fmt::Debug for JoinMap { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// // debug print the keys in this `JoinMap`. +// struct KeySet<'a, K, V, S>(&'a JoinMap); +// impl fmt::Debug for KeySet<'_, K, V, S> { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// f.debug_set() +// .entries(self.0.key_set.iter().map(|entry| &entry.key)) +// .finish() +// } +// } + +// f.debug_struct("JoinMap") +// .field("key_set", &KeySet(self)) +// .finish() +// } +// } + +impl Default for JoinMap { + fn default() -> Self { + Self::new() + } +} \ No newline at end of file diff --git a/tokio-util/src/task/mod.rs b/tokio-util/src/task/mod.rs index 5aa33df2dc0..ef12656fde7 100644 --- a/tokio-util/src/task/mod.rs +++ b/tokio-util/src/task/mod.rs @@ -1,4 +1,10 @@ //! Extra utilities for spawning tasks +#[cfg(all(tokio_unstable, feature = "rt"))] +mod join_map; mod spawn_pinned; pub use spawn_pinned::LocalPoolHandle; + +#[cfg(all(tokio_unstable, feature = "rt"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))] +pub use join_map::JoinMap; diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 6bda46ef662..369186eb678 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.17.0" +version = "1.18.0" edition = "2018" rust-version = "1.49" authors = ["Tokio Contributors "] From fed0b8c5835838abf28d5c8f95967ba0cdcf6944 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:19:34 -0700 Subject: [PATCH 02/21] add tests Signed-off-by: Eliza Weisman --- tokio-util/tests/task_join_map.rs | 275 ++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 tokio-util/tests/task_join_map.rs diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs new file mode 100644 index 00000000000..72c4b5c03f9 --- /dev/null +++ b/tokio-util/tests/task_join_map.rs @@ -0,0 +1,275 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "rt", tokio_unstable))] + +use tokio::sync::oneshot; +use tokio_util::task::JoinMap; +use tokio::time::Duration; + +use futures::future::FutureExt; + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() +} + +#[tokio::test(start_paused = true)] +async fn test_with_sleep() { + let mut map = JoinMap::new(); + + for i in 0..10 { + map.spawn(i, async move { i }); + assert_eq!(map.len(), 1 + i); + } + map.detach_all(); + assert_eq!(map.len(), 0); + + assert!(matches!(map.join_one().await, None)); + + for i in 0..10 { + map.spawn(i, async move { + tokio::time::sleep(Duration::from_secs(i as u64)).await; + i + }); + assert_eq!(map.len(), 1 + i); + } + + let mut seen = [false; 10]; + while let Some((k, res)) = map.join_one().await { + seen[k] = true; + assert_eq!(res.expect("task should have completed successfully"), k); + } + + for was_seen in &seen { + assert!(was_seen); + } + assert!(matches!(map.join_one().await, None)); + + // Do it again. + for i in 0..10 { + map.spawn(i, async move { + tokio::time::sleep(Duration::from_secs(i as u64)).await; + i + }); + } + + let mut seen = [false; 10]; + while let Some((k, res)) = map.join_one().await { + seen[k] = true; + assert_eq!(res.expect("task should have completed successfully"), k); + } + + for was_seen in &seen { + assert!(was_seen); + } + assert!(matches!(map.join_one().await, None)); +} + +#[tokio::test] +async fn test_abort_on_drop() { + let mut map = JoinMap::new(); + + let mut recvs = Vec::new(); + + for i in 0..16 { + let (send, recv) = oneshot::channel::<()>(); + recvs.push(recv); + + map.spawn(i, async { + // This task will never complete on its own. + futures::future::pending::<()>().await; + drop(send); + }); + } + + drop(map); + + for recv in recvs { + // The task is aborted soon and we will receive an error. + assert!(recv.await.is_err()); + } +} + +#[tokio::test] +async fn alternating() { + let mut map = JoinMap::new(); + + assert_eq!(map.len(), 0); + map.spawn(1, async {}); + assert_eq!(map.len(), 1); + map.spawn(2, async {}); + assert_eq!(map.len(), 2); + + for i in 0..16 { + let (_, res) = map.join_one().await.unwrap(); + assert!(res.is_ok()); + assert_eq!(map.len(), 1); + map.spawn(i, async {}); + assert_eq!(map.len(), 2); + } +} + +#[tokio::test(start_paused = true)] +async fn abort_by_key() { + let mut map = JoinMap::new(); + let mut num_canceled = 0; + let mut num_completed = 0; + for i in 0..16 { + map.spawn(i, async move { + tokio::time::sleep(Duration::from_secs(i as u64)).await; + }); + } + + for i in 0..16 { + if i % 2 != 0 { + // abort odd-numbered tasks. + map.abort(&i); + } + } + + while let Some((key, res)) = map.join_one().await { + match res { + Ok(()) => { + num_completed += 1; + assert_eq!(key % 2, 0); + assert!(!map.contains_key(&key)); + } + Err(e) => { + num_canceled += 1; + assert!(e.is_cancelled()); + assert_ne!(key % 2, 0); + assert!(!map.contains_key(&key)); + } + } + } + + assert_eq!(num_canceled, 8); + assert_eq!(num_completed, 8); +} + +#[tokio::test(start_paused = true)] +async fn abort_by_predicate() { + let mut map = JoinMap::new(); + let mut num_canceled = 0; + let mut num_completed = 0; + for i in 0..16 { + map.spawn(i, async move { + tokio::time::sleep(Duration::from_secs(i as u64)).await; + }); + } + + // abort odd-numbered tasks. + map.abort_matching(|key| key % 2 != 0); + + while let Some((key, res)) = map.join_one().await { + match res { + Ok(()) => { + num_completed += 1; + assert_eq!(key % 2, 0); + assert!(!map.contains_key(&key)); + } + Err(e) => { + num_canceled += 1; + assert!(e.is_cancelled()); + assert_ne!(key % 2, 0); + assert!(!map.contains_key(&key)); + } + } + } + + assert_eq!(num_canceled, 8); + assert_eq!(num_completed, 8); +} + +#[test] +fn runtime_gone() { + let mut map = JoinMap::new(); + { + let rt = rt(); + map.spawn_on("key", async { 1 }, rt.handle()); + drop(rt); + } + + let (key, res) = rt().block_on(map.join_one()).unwrap(); + assert_eq!(key, "key"); + assert!(res.unwrap_err().is_cancelled()); +} + +// This ensures that `join_one` works correctly when the coop budget is +// exhausted. +#[tokio::test(flavor = "current_thread")] +async fn join_map_coop() { + // Large enough to trigger coop. + const TASK_NUM: u32 = 1000; + + static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); + + let mut map = JoinMap::new(); + + for i in 0..TASK_NUM { + map.spawn(i, async move { + SEM.add_permits(1); + i + }); + } + + // Wait for all tasks to complete. + // + // Since this is a `current_thread` runtime, there's no race condition + // between the last permit being added and the task completing. + let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); + + let mut count = 0; + let mut coop_count = 0; + loop { + match map.join_one().now_or_never() { + Some(Some((key, Ok(i)))) => assert_eq!(key, i), + Some(Some((key, Err(err)))) => panic!("failed[{}]: {}", key, err), + None => { + coop_count += 1; + tokio::task::yield_now().await; + continue; + } + Some(None) => break, + } + + count += 1; + } + assert!(coop_count >= 1); + assert_eq!(count, TASK_NUM); +} + +#[tokio::test(start_paused = true)] +async fn abort_all() { + let mut map: JoinMap = JoinMap::new(); + + for i in 0..5 { + map.spawn(i, futures::future::pending()); + } + for i in 5..10 { + map.spawn(i, async { + tokio::time::sleep(Duration::from_secs(1)).await; + }); + } + + // The join map will now have 5 pending tasks and 5 ready tasks. + tokio::time::sleep(Duration::from_secs(2)).await; + + map.abort_all(); + assert_eq!(map.len(), 10); + + let mut count = 0; + let mut seen = [false; 10]; + while let Some((k, res)) = map.join_one().await { + seen[k] = true; + if let Err(err) = res { + assert!(err.is_cancelled()); + } + count += 1; + } + assert_eq!(count, 10); + assert_eq!(map.len(), 0); + for was_seen in &seen { + assert!(was_seen); + } +} \ No newline at end of file From a47867237795a36f6bf33de3c9f165f9dfe7008f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:19:40 -0700 Subject: [PATCH 03/21] wip Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index f96ec3c5126..e3d475b253b 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -378,8 +378,18 @@ where /// that panicked or was aborted. /// * `None` if the `JoinMap` is empty. pub async fn join_one(&mut self) -> Option<(K, Result)> { - // crate::future::poll_fn(|cx| self.poll_join_one(cx)).await - todo!("eliza") + let (res, id) = match self.task_set.join_one_with_id().await { + Ok(task) => { + let (id, output) = task?; + (Ok(output), id) + }, + Err(e) => { + let id = e.id(); + (Err(e), id) + } + }; + let key = self.remove_by_id(id)?; + Some((key, res)) } /// Aborts all tasks and waits for them to finish shutting down. @@ -639,15 +649,17 @@ where self.tasks_by_key.raw_entry().from_hash(*hash, |k| &k.id == id) } - fn remove_by_id(&mut self, id: Id) -> Option { + fn remove_by_id(&mut self, id: Id) -> Option { let hash = self.hashes_by_task.remove(&id)?; let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.id == id); - let handle = match entry { - RawEntryMut::Occupied(entry) => entry.remove(), + let (Key { id: _key_id, key }, handle) = match entry { + RawEntryMut::Occupied(entry) => entry.remove_entry(), _ => return None, }; - self.hashes_by_task.remove(&handle.id()); - Some(handle) + debug_assert_eq!(_key_id, id); + debug_assert_eq!(id, handle.id()); + self.hashes_by_task.remove(&id); + Some(key) } fn remove_by_key(&mut self, key: &Q) -> Option @@ -658,7 +670,7 @@ where let hash = self.hash(key); let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key.borrow() == key); let handle = match entry { - RawEntryMut::Occupied(mut entry) => entry.remove(), + RawEntryMut::Occupied(entry) => entry.remove(), _ => return None, }; self.hashes_by_task.remove(&handle.id()); From 9940c67336ce74f55049459352c1cca44119d8aa Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:33:02 -0700 Subject: [PATCH 04/21] abort handles by ref so keys are still returned Signed-off-by: Eliza Weisman --- tokio/src/runtime/task/abort.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index cad639ca0c8..4977377880d 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -43,8 +43,8 @@ impl AbortHandle { // the `AbortHandle` type is only publicly exposed when `tokio_unstable` is // enabled, but it is still defined for testing purposes. #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] - pub fn abort(self) { - if let Some(raw) = self.raw { + pub fn abort(&self) { + if let Some(ref raw) = self.raw { raw.remote_abort(); } } From d5014d520054424c81b3e10c2546f0f43d5c657d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:33:09 -0700 Subject: [PATCH 05/21] fix ids getting hashed Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index e3d475b253b..b9dc32cf912 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -111,7 +111,7 @@ pub struct JoinMap { } /// A `JoinMap` key. -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug)] struct Key { key: K, id: Id, @@ -465,8 +465,8 @@ where Q: Hash + Eq, K: Borrow, { - match self.remove_by_key(key) { - Some(handle) => { handle.abort(); true } + match self.get_by_key(key) { + Some((_, handle)) => { handle.abort(); true } None => false, } } @@ -732,4 +732,23 @@ impl Default for JoinMap { fn default() -> Self { Self::new() } -} \ No newline at end of file +} + +// === impl Key === + +impl Hash for Key { + // Don't include the task ID in the hash. + #[inline] + fn hash(&self, hasher: &mut H) { + self.key.hash(hasher); + } +} + +impl PartialEq for Key { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for Key {} \ No newline at end of file From 9397dc41dae48c0650d2d089d9aaf7900ec3c640 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:47:32 -0700 Subject: [PATCH 06/21] put back `abort_matching` Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index b9dc32cf912..616058331c0 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -525,17 +525,14 @@ where /// # } /// ``` pub fn abort_matching(&mut self, mut predicate: impl FnMut(&K) -> bool) { - // let key_set = &mut self.key_set; - // let joins = &mut self.task_set; - // // Note: this method iterates over the key set *without* removing any - // // entries, so that the keys from aborted tasks can be returned when - // // polling the `JoinMap`. - // for entry in key_set.iter().filter(|entry| predicate(&entry.key)) { - // if let Some(mut entry) = joins.entry_mut(entry.task.entry.clone()) { - // entry.with_value_and_context(|(_, jh), _| jh.abort()); - // } - // } - todo!("eliza") + // Note: this method iterates over the key set *without* removing any + // entries, so that the keys from aborted tasks can be returned when + // polling the `JoinMap`. + for (Key {ref key, ..}, task) in &self.tasks_by_key { + if predicate(key) { + task.abort(); + } + } } /// Returns `true` if this `JoinMap` contains a task for the provided key. From c8ed02c5d29ed857faaa59936d9ea95560223ec6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:48:49 -0700 Subject: [PATCH 07/21] fix detach_all Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 616058331c0..787c3736dbc 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -702,7 +702,9 @@ where /// The tasks removed by this call will continue to run in the background even if the `JoinMap` /// is dropped. They may still be aborted by key. pub fn detach_all(&mut self) { - self.task_set.detach_all() + self.task_set.detach_all(); + self.tasks_by_key.clear(); + self.hashes_by_task.clear(); } } From 944a2c3719425287a42078d7ce7de119c506182d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:56:38 -0700 Subject: [PATCH 08/21] fixup doctests Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 787c3736dbc..3932a5359af 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -35,7 +35,7 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// Spawn multiple tasks and wait for them: /// /// ``` -/// use tokio::task::JoinMap; +/// use tokio_util::task::JoinMap; /// /// #[tokio::main] /// async fn main() { @@ -64,7 +64,7 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// Cancel tasks based on their keys: /// /// ``` -/// use tokio::task::JoinMap; +/// use tokio_util::task::JoinMap; /// /// #[tokio::main] /// async fn main() { @@ -126,7 +126,7 @@ impl JoinMap { /// # Examples /// /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// let map: JoinMap<&str, i32> = JoinMap::new(); /// ``` #[inline] @@ -143,7 +143,7 @@ impl JoinMap { /// # Examples /// /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// let map: JoinMap<&str, i32> = JoinMap::with_capacity(10); /// ``` #[inline] @@ -191,7 +191,7 @@ impl JoinMap { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// use std::collections::hash_map::RandomState; /// /// let s = RandomState::new(); @@ -231,7 +231,7 @@ impl JoinMap { /// # Examples /// /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// let map: JoinMap = JoinMap::with_capacity(100); /// assert!(map.capacity() >= 100); @@ -418,7 +418,7 @@ where /// Aborting a task by key: /// /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// # #[tokio::main] /// # async fn main() { @@ -444,7 +444,7 @@ where /// /// `abort` returns `true` if a task was aborted: /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// # #[tokio::main] /// # async fn main() { @@ -479,7 +479,7 @@ where /// /// # Examples /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// # // use the current thread rt so that spawned tasks don't /// # // complete in the background before they can be aborted. @@ -567,7 +567,7 @@ where /// # Examples /// /// ``` - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// let mut map: JoinMap<&str, i32> = JoinMap::new(); /// map.reserve(10); @@ -587,7 +587,7 @@ where /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// let mut map: JoinMap = JoinMap::with_capacity(100); /// map.spawn(1, async move { 2 }); @@ -614,7 +614,7 @@ where /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::task::JoinMap; + /// use tokio_util::task::JoinMap; /// /// let mut map: JoinMap = JoinMap::with_capacity(100); /// map.spawn(1, async move { 2 }); From c400dbc5e4f86502acc42388e92917a04fcd781f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 13:58:52 -0700 Subject: [PATCH 09/21] fixup docs Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 3932a5359af..54ccae8f54c 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -10,10 +10,11 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// A collection of tasks spawned on a Tokio runtime, associated with hash map /// keys. /// -/// This type is very similar to the [`JoinSet`] type, with the addition of a -/// set of keys associated with each task. These keys allow [cancelling a -/// task][abort] or [multiple tasks][abort_matching] in the `JoinMap` based on -/// their keys, or [test whether a task corresponding to a given key exists][contains] in the `JoinMap`. +/// This type is very similar to the [`JoinSet`] type in `tokio::task`, with the +/// addition of a set of keys associated with each task. These keys allow +/// [cancelling a task][abort] or [multiple tasks][abort_matching] in the +/// `JoinMap` based on their keys, or [test whether a task corresponding to a +/// given key exists][contains] in the `JoinMap`. /// /// In addition, when tasks in the `JoinMap` complete, they will return the /// associated key along with the value returned by the task, if any. @@ -26,9 +27,9 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// /// When the `JoinMap` is dropped, all tasks in the `JoinMap` are immediately aborted. /// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. +/// **Note**: This type depends on Tokio's [unstable API][unstable]. See [the +/// documentation on unstable features][unstable] for details on how to enable +/// Tokio's unstable features. /// /// # Examples /// @@ -92,11 +93,11 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// } /// ``` /// -/// [`JoinSet`]: crate::task::JoinSet -/// [unstable]: crate#unstable-features +/// [`JoinSet`]: tokio::task::JoinSet +/// [unstable]: tokio#unstable-features /// [abort]: fn@Self::abort /// [abort_matching]: fn@Self::abort_matching -/// [contains]: fn@Self::contains_task +/// [contains]: fn@Self::contains_key #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] #[derive(Debug)] pub struct JoinMap { @@ -549,6 +550,14 @@ where self.get_by_key(key).is_some() } + /// Returns `true` if this `JoinMap` contains a task with the provided + /// [task ID]. + /// + /// If the task has completed, but its output hasn't yet been consumed by a + /// call to [`join_one`], this method will still return `true`. + /// + /// [`join_one`]: fn@Self::join_one + /// [task ID]: tokio::task::Id pub fn contains_task(&self, task: &Id) -> bool { self.get_by_id(task).is_some() } From 04715f652a18cd4fa02b75cebde9489f948ea672 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 14:01:21 -0700 Subject: [PATCH 10/21] nicer debug impl Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 54ccae8f54c..702a2c7de03 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -1,9 +1,10 @@ use tokio::runtime::Handle; use tokio::task::{JoinError, AbortHandle, LocalSet, JoinSet, Id}; -use std::borrow::Borrow; -use std::collections::hash_map::RandomState; use hashbrown::HashMap; use hashbrown::hash_map::RawEntryMut; +use std::borrow::Borrow; +use std::collections::hash_map::RandomState; +use std::fmt; use std::future::Future; use std::hash::{BuildHasher, Hash, Hasher}; @@ -99,7 +100,6 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// [abort_matching]: fn@Self::abort_matching /// [contains]: fn@Self::contains_key #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] -#[derive(Debug)] pub struct JoinMap { tasks_by_key: HashMap, AbortHandle, S>, hashes_by_task: HashMap, @@ -717,24 +717,15 @@ where } } - -// impl fmt::Debug for JoinMap { -// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { -// // debug print the keys in this `JoinMap`. -// struct KeySet<'a, K, V, S>(&'a JoinMap); -// impl fmt::Debug for KeySet<'_, K, V, S> { -// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { -// f.debug_set() -// .entries(self.0.key_set.iter().map(|entry| &entry.key)) -// .finish() -// } -// } - -// f.debug_struct("JoinMap") -// .field("key_set", &KeySet(self)) -// .finish() -// } -// } +// Hand-written `fmt::Debug` implementation in order to avoid requiring `V: +// Debug`, since no value is ever actually stored in the map. +impl fmt::Debug for JoinMap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JoinMap") + .field("tasks_by_key", &self.tasks_by_key) + .finish() + } +} impl Default for JoinMap { fn default() -> Self { From 2aa22029437e25ee092f3246db2099ccf4a63786 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 14:08:11 -0700 Subject: [PATCH 11/21] misc cleanup Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 59 +++++++++++++++++++-------------- tokio-util/src/task/mod.rs | 4 +-- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 702a2c7de03..16d57dd4ec4 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -108,10 +108,18 @@ pub struct JoinMap { /// /// Each `IdleNotifiedSet` entry contains the hash of the task's key, to /// allow looking the key up when the task completes. - task_set: JoinSet, + tasks: JoinSet, } /// A `JoinMap` key. +/// +/// This holds both a `K`-typed key (the actual key as seen by the user), _and_ +/// a task ID, so that hash collisions between `K`-typed keys can be resolved +/// using either `K`'s `Eq` impl *or* by checking the task IDs. +/// +/// This allows looking up a task using either an actual key (such as when the +/// user queries the map with a key), *or* using a task ID and a hash (such as +/// when removing completed tasks from the map). #[derive(Debug)] struct Key { key: K, @@ -206,7 +214,7 @@ impl JoinMap { Self { tasks_by_key: HashMap::with_capacity_and_hasher(capacity, hash_builder.clone()), hashes_by_task: HashMap::with_capacity_and_hasher(capacity, hash_builder), - task_set: JoinSet::new(), + tasks: JoinSet::new(), } } @@ -270,7 +278,7 @@ where F: Send + 'static, V: Send, { - let task = self.task_set.spawn(task); + let task = self.tasks.spawn(task); self.insert(key, task) } @@ -289,7 +297,7 @@ where F: Send + 'static, V: Send, { - let task = self.task_set.spawn_on(task, handle); + let task = self.tasks.spawn_on(task, handle); self.insert(key, task); } @@ -305,14 +313,14 @@ where /// /// This method panics if it is called outside of a `LocalSet`. /// - /// [`LocalSet`]: crate::task::LocalSet + /// [`LocalSet`]: tokio::task::LocalSet /// [`join_one`]: Self::join_one pub fn spawn_local(&mut self, key: K, task: F) where F: Future, F: 'static, { - let task = self.task_set.spawn_local(task); + let task = self.tasks.spawn_local(task); self.insert(key, task); } @@ -331,7 +339,7 @@ where F: Future, F: 'static, { - let task = self.task_set.spawn_local_on(task, local_set); + let task = self.tasks.spawn_local_on(task, local_set); self.insert(key, task) } @@ -339,9 +347,13 @@ where let hash = self.hash(&key); let id = abort.id(); let map_key = Key { id: id.clone(), key }; + + // Insert the new key into the map of tasks by keys. let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key == map_key.key); match entry { RawEntryMut::Occupied(mut occ) => { + // There was a previous task spawned with the same key! Cancel + // that task, and remove its ID from the map of hashes by task IDs. let Key { id: prev_id, .. } = occ.insert_key(map_key); occ.insert(abort).abort(); let _prev_hash = self.hashes_by_task.remove(&prev_id); @@ -352,6 +364,7 @@ where } }; + // Associate the key's hash with this task's ID, for looking up tasks by ID. let _prev = self.hashes_by_task.insert(id, hash); debug_assert!(_prev.is_none(), "no prior task should have had the same ID"); } @@ -379,7 +392,7 @@ where /// that panicked or was aborted. /// * `None` if the `JoinMap` is empty. pub async fn join_one(&mut self) -> Option<(K, Result)> { - let (res, id) = match self.task_set.join_one_with_id().await { + let (res, id) = match self.tasks.join_one_with_id().await { Ok(task) => { let (id, output) = task?; (Ok(output), id) @@ -641,6 +654,8 @@ where self.tasks_by_key.shrink_to(min_capacity) } + + /// Look up a task in the map by its key, returning the key and abort handle. fn get_by_key<'map, Q: ?Sized>(&'map self, key: &Q) -> Option<(&'map Key, &'map AbortHandle)> where Q: Hash + Eq, @@ -650,13 +665,18 @@ where self.tasks_by_key.raw_entry().from_hash(hash, |k| k.key.borrow() == key) } + /// Look up a task in the map by its task ID, returning the key and abort handle. fn get_by_id<'map>(&'map self, id: &Id) -> Option<(&'map Key, &'map AbortHandle)> { let hash = self.hashes_by_task.get(id)?; self.tasks_by_key.raw_entry().from_hash(*hash, |k| &k.id == id) } + /// Remove a task from the map by ID, returning the key for that task. fn remove_by_id(&mut self, id: Id) -> Option { + // Get the hash for the given ID. let hash = self.hashes_by_task.remove(&id)?; + + // Remove the entry for that hash. let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.id == id); let (Key { id: _key_id, key }, handle) = match entry { RawEntryMut::Occupied(entry) => entry.remove_entry(), @@ -668,21 +688,8 @@ where Some(key) } - fn remove_by_key(&mut self, key: &Q) -> Option - where - Q: Hash + Eq, - K: Borrow, - { - let hash = self.hash(key); - let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key.borrow() == key); - let handle = match entry { - RawEntryMut::Occupied(entry) => entry.remove(), - _ => return None, - }; - self.hashes_by_task.remove(&handle.id()); - Some(handle) - } - + /// Returns the hash for a given key. + #[inline] fn hash(&self, key: &Q) -> u64 where Q: Hash, @@ -703,7 +710,7 @@ where /// This does not remove the tasks from the `JoinMap`. To wait for the tasks to complete /// cancellation, you should call `join_one` in a loop until the `JoinMap` is empty. pub fn abort_all(&mut self) { - self.task_set.abort_all() + self.tasks.abort_all() } /// Removes all tasks from this `JoinMap` without aborting them. @@ -711,7 +718,7 @@ where /// The tasks removed by this call will continue to run in the background even if the `JoinMap` /// is dropped. They may still be aborted by key. pub fn detach_all(&mut self) { - self.task_set.detach_all(); + self.tasks.detach_all(); self.tasks_by_key.clear(); self.hashes_by_task.clear(); } @@ -743,6 +750,8 @@ impl Hash for Key { } } +// Because we override `Hash` for this type, we must also override the +// `PartialEq` impl, so that all instances with the same hash are equal. impl PartialEq for Key { #[inline] fn eq(&self, other: &Self) -> bool { diff --git a/tokio-util/src/task/mod.rs b/tokio-util/src/task/mod.rs index ef12656fde7..7ba8ad9a218 100644 --- a/tokio-util/src/task/mod.rs +++ b/tokio-util/src/task/mod.rs @@ -1,10 +1,10 @@ //! Extra utilities for spawning tasks -#[cfg(all(tokio_unstable, feature = "rt"))] +#[cfg(tokio_unstable)] mod join_map; mod spawn_pinned; pub use spawn_pinned::LocalPoolHandle; -#[cfg(all(tokio_unstable, feature = "rt"))] +#[cfg(tokio_unstable)] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))] pub use join_map::JoinMap; From 8b5bb19f02b9ae76f7da5c2a38ddc46d885ab341 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 14:31:01 -0700 Subject: [PATCH 12/21] rutsfmt --- tokio-util/src/task/join_map.rs | 49 ++++++++++++++++++++----------- tokio-util/tests/task_join_map.rs | 4 +-- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 16d57dd4ec4..2e79d2a8a43 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -1,12 +1,12 @@ -use tokio::runtime::Handle; -use tokio::task::{JoinError, AbortHandle, LocalSet, JoinSet, Id}; -use hashbrown::HashMap; use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; use std::borrow::Borrow; use std::collections::hash_map::RandomState; use std::fmt; use std::future::Future; use std::hash::{BuildHasher, Hash, Hasher}; +use tokio::runtime::Handle; +use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet}; /// A collection of tasks spawned on a Tokio runtime, associated with hash map /// keys. @@ -15,7 +15,7 @@ use std::hash::{BuildHasher, Hash, Hasher}; /// addition of a set of keys associated with each task. These keys allow /// [cancelling a task][abort] or [multiple tasks][abort_matching] in the /// `JoinMap` based on their keys, or [test whether a task corresponding to a -/// given key exists][contains] in the `JoinMap`. +/// given key exists][contains] in the `JoinMap`. /// /// In addition, when tasks in the `JoinMap` complete, they will return the /// associated key along with the value returned by the task, if any. @@ -339,17 +339,23 @@ where F: Future, F: 'static, { - let task = self.tasks.spawn_local_on(task, local_set); + let task = self.tasks.spawn_local_on(task, local_set); self.insert(key, task) } fn insert(&mut self, key: K, abort: AbortHandle) { let hash = self.hash(&key); let id = abort.id(); - let map_key = Key { id: id.clone(), key }; + let map_key = Key { + id: id.clone(), + key, + }; // Insert the new key into the map of tasks by keys. - let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.key == map_key.key); + let entry = self + .tasks_by_key + .raw_entry_mut() + .from_hash(hash, |k| k.key == map_key.key); match entry { RawEntryMut::Occupied(mut occ) => { // There was a previous task spawned with the same key! Cancel @@ -358,8 +364,8 @@ where occ.insert(abort).abort(); let _prev_hash = self.hashes_by_task.remove(&prev_id); debug_assert_eq!(Some(hash), _prev_hash); - }, - RawEntryMut::Vacant(vac) => { + } + RawEntryMut::Vacant(vac) => { vac.insert(map_key, abort); } }; @@ -396,7 +402,7 @@ where Ok(task) => { let (id, output) = task?; (Ok(output), id) - }, + } Err(e) => { let id = e.id(); (Err(e), id) @@ -480,7 +486,10 @@ where K: Borrow, { match self.get_by_key(key) { - Some((_, handle)) => { handle.abort(); true } + Some((_, handle)) => { + handle.abort(); + true + } None => false, } } @@ -542,7 +551,7 @@ where // Note: this method iterates over the key set *without* removing any // entries, so that the keys from aborted tasks can be returned when // polling the `JoinMap`. - for (Key {ref key, ..}, task) in &self.tasks_by_key { + for (Key { ref key, .. }, task) in &self.tasks_by_key { if predicate(key) { task.abort(); } @@ -654,7 +663,6 @@ where self.tasks_by_key.shrink_to(min_capacity) } - /// Look up a task in the map by its key, returning the key and abort handle. fn get_by_key<'map, Q: ?Sized>(&'map self, key: &Q) -> Option<(&'map Key, &'map AbortHandle)> where @@ -662,13 +670,17 @@ where K: Borrow, { let hash = self.hash(key); - self.tasks_by_key.raw_entry().from_hash(hash, |k| k.key.borrow() == key) + self.tasks_by_key + .raw_entry() + .from_hash(hash, |k| k.key.borrow() == key) } /// Look up a task in the map by its task ID, returning the key and abort handle. fn get_by_id<'map>(&'map self, id: &Id) -> Option<(&'map Key, &'map AbortHandle)> { let hash = self.hashes_by_task.get(id)?; - self.tasks_by_key.raw_entry().from_hash(*hash, |k| &k.id == id) + self.tasks_by_key + .raw_entry() + .from_hash(*hash, |k| &k.id == id) } /// Remove a task from the map by ID, returning the key for that task. @@ -677,7 +689,10 @@ where let hash = self.hashes_by_task.remove(&id)?; // Remove the entry for that hash. - let entry = self.tasks_by_key.raw_entry_mut().from_hash(hash, |k| k.id == id); + let entry = self + .tasks_by_key + .raw_entry_mut() + .from_hash(hash, |k| k.id == id); let (Key { id: _key_id, key }, handle) = match entry { RawEntryMut::Occupied(entry) => entry.remove_entry(), _ => return None, @@ -759,4 +774,4 @@ impl PartialEq for Key { } } -impl Eq for Key {} \ No newline at end of file +impl Eq for Key {} diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 72c4b5c03f9..d5f87bfb185 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -2,8 +2,8 @@ #![cfg(all(feature = "rt", tokio_unstable))] use tokio::sync::oneshot; -use tokio_util::task::JoinMap; use tokio::time::Duration; +use tokio_util::task::JoinMap; use futures::future::FutureExt; @@ -272,4 +272,4 @@ async fn abort_all() { for was_seen in &seen { assert!(was_seen); } -} \ No newline at end of file +} From 41d92d7d7b80d07abceb35b621c0f39151de3066 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 14:34:29 -0700 Subject: [PATCH 13/21] do a coolendrun around msrv CI Signed-off-by: Eliza Weisman --- tokio-util/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 8b2dcdc1c77..992ff234a8b 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -35,7 +35,6 @@ __docs_rs = ["futures-util"] [dependencies] tokio = { version = "1.18.0", path = "../tokio", features = ["sync"] } -hashbrown = { version = "0.12.0", optional = true } bytes = "1.0.0" futures-core = "0.3.0" futures-sink = "0.3.0" @@ -45,6 +44,9 @@ pin-project-lite = "0.2.0" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +[target.'cfg(tokio_unstable)'.dependencies] +hashbrown = { version = "0.12.0", optional = true } + [dev-dependencies] tokio = { version = "1.0.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.4.0", path = "../tokio-test" } From c532bf2ade84868a2ab409136174cc54f68142f3 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 14:35:38 -0700 Subject: [PATCH 14/21] placate CI readme check Signed-off-by: Eliza Weisman --- tokio/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/README.md b/tokio/README.md index 1cce34aeeff..47522be3cb7 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.17.0", features = ["full"] } +tokio = { version = "1.18.0", features = ["full"] } ``` Then, on your main.rs: From 7947a10ac952315e1975a01df5c0ea4aaa17285c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 15:00:13 -0700 Subject: [PATCH 15/21] blerrrghhhhhhhhhh hhhhhhhhh --- .cargo/config | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .cargo/config diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 00000000000..df8858986f3 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,2 @@ +# [build] +# rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file From 43875a943f461e0fafd56ecb90212d4e5eccef44 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 15:02:37 -0700 Subject: [PATCH 16/21] fix remaining docs reference whoops god damn it Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 2e79d2a8a43..3e2dc65bd57 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -332,7 +332,7 @@ where /// be removed from the `JoinMap`; a subsequent call to [`join_one`] will /// *not* return a cancelled [`JoinError`] for that task. /// - /// [`LocalSet`]: crate::task::LocalSet + /// [`LocalSet`]: tokio::task::LocalSet /// [`join_one`]: Self::join_one pub fn spawn_local_on(&mut self, key: K, task: F, local_set: &LocalSet) where From fda7c12b9a941bbb621b0bd3875bc1372bb6fa1c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 25 Apr 2022 15:35:58 -0700 Subject: [PATCH 17/21] ghghhhhhhhhhhhhghghhhghghghghghhhh Signed-off-by: Eliza Weisman --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1cce34aeeff..47522be3cb7 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.17.0", features = ["full"] } +tokio = { version = "1.18.0", features = ["full"] } ``` Then, on your main.rs: From 51a0a6b9dac59f9817d5441849d8689530e1e300 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 26 Apr 2022 09:39:56 -0700 Subject: [PATCH 18/21] docs improvements Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 3e2dc65bd57..5d643186b80 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -101,17 +101,30 @@ use tokio::task::{AbortHandle, Id, JoinError, JoinSet, LocalSet}; /// [contains]: fn@Self::contains_key #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] pub struct JoinMap { + /// A map of the [`AbortHandle`]s of the tasks spawned on this `JoinMap`, + /// indexed by their keys and task IDs. + /// + /// The [`Key`] type contains both the task's `K`-typed key provided when + /// spawning tasks, and the task's IDs. The IDs are stored here to resolve + /// hash collisions when looking up tasks based on their pre-computed hash + /// (as stored in the `hashes_by_task` map). tasks_by_key: HashMap, AbortHandle, S>, - hashes_by_task: HashMap, - /// The set of tasks spawned on the `JoinMap`. + /// A map from task IDs to the hash of the key associated with that task. /// - /// Each `IdleNotifiedSet` entry contains the hash of the task's key, to - /// allow looking the key up when the task completes. + /// This map is used to perform reverse lookups of tasks in the + /// `tasks_by_key` map based on their task IDs. When a task terminates, the + /// ID is provided to us by the `JoinSet`, so we can look up the hash value + /// of that task's key, and then remove it from the `tasks_by_key` map using + /// the raw hash code, resolving collisions by comparing task IDs. + hashes_by_task: HashMap, + + /// The [`JoinSet`] that awaits the completion of tasks spawned on this + /// `JoinMap`. tasks: JoinSet, } -/// A `JoinMap` key. +/// A [`JoinMap`] key. /// /// This holds both a `K`-typed key (the actual key as seen by the user), _and_ /// a task ID, so that hash collisions between `K`-typed keys can be resolved @@ -382,7 +395,7 @@ where /// /// # Cancel Safety /// - /// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!` + /// This method is cancel safe. If `join_one` is used as the event in a [`tokio::select!`] /// statement and some other branch completes first, it is guaranteed that no tasks were /// removed from this `JoinMap`. /// @@ -397,6 +410,8 @@ where /// panicked or been aborted. `key` is the key associated with the task /// that panicked or was aborted. /// * `None` if the `JoinMap` is empty. + /// + /// [`tokio::select!`]: tokio::select pub async fn join_one(&mut self) -> Option<(K, Result)> { let (res, id) = match self.tasks.join_one_with_id().await { Ok(task) => { @@ -548,9 +563,9 @@ where /// # } /// ``` pub fn abort_matching(&mut self, mut predicate: impl FnMut(&K) -> bool) { - // Note: this method iterates over the key set *without* removing any - // entries, so that the keys from aborted tasks can be returned when - // polling the `JoinMap`. + // Note: this method iterates over the tasks and keys *without* removing + // any entries, so that the keys from aborted tasks can still be + // returned when calling `join_one` in the future. for (Key { ref key, .. }, task) in &self.tasks_by_key { if predicate(key) { task.abort(); From 838d072c1ba1a49dcf7ed1b2c706447571e4739a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 26 Apr 2022 09:40:44 -0700 Subject: [PATCH 19/21] remove unneeded static bounds Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 5d643186b80..2f041610ede 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -268,7 +268,7 @@ impl JoinMap { impl JoinMap where - K: Hash + Eq + 'static, + K: Hash + Eq, V: 'static, S: BuildHasher, { @@ -732,7 +732,6 @@ where impl JoinMap where - K: 'static, V: 'static, { /// Aborts all tasks on this `JoinMap`. From f0adddb1b5a29741eb0aabfc0bea8eb41daa8609 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 26 Apr 2022 09:51:38 -0700 Subject: [PATCH 20/21] remove duplicate task IDs from debug output Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 2f041610ede..3e3538ff5b5 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -757,8 +757,23 @@ where // Debug`, since no value is ever actually stored in the map. impl fmt::Debug for JoinMap { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // format the task keys and abort handles a little nicer by just + // printing the key and task ID pairs, without format the `Key` struct + // itself or the `AbortHandle`, which would just format the task's ID + // again. + struct KeySet<'a, K: fmt::Debug, S>(&'a HashMap, AbortHandle, S>); + impl fmt::Debug for KeySet<'_, K, S> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_map().entries(self.0.keys().map(|Key { key, id }| (key, id))).finish() + } + } + f.debug_struct("JoinMap") - .field("tasks_by_key", &self.tasks_by_key) + // The `tasks_by_key` map is the only one that contains information + // that's really worth formatting for the user, since it contains + // the tasks' keys and IDs. The other fields are basically + // implementation details. + .field("tasks", &KeySet(&self.tasks_by_key)) .finish() } } From 3d0560011d13ff12b629c0a615d6c7f6f544a29f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 26 Apr 2022 10:06:46 -0700 Subject: [PATCH 21/21] rustfmt Signed-off-by: Eliza Weisman --- tokio-util/src/task/join_map.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 3e3538ff5b5..41c82f448ab 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -764,7 +764,9 @@ impl fmt::Debug for JoinMap { struct KeySet<'a, K: fmt::Debug, S>(&'a HashMap, AbortHandle, S>); impl fmt::Debug for KeySet<'_, K, S> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_map().entries(self.0.keys().map(|Key { key, id }| (key, id))).finish() + f.debug_map() + .entries(self.0.keys().map(|Key { key, id }| (key, id))) + .finish() } }