From e4eff4d1c95259ac3a46f21c910fcc2c2f96004f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 13 Jul 2022 14:57:37 -0700 Subject: [PATCH] refac(console): factor out a `Store` type (#359) Currently, there's a bit of repeated boilerplate code for processing updates to the stored state for tasks, resources, and async ops, such as tracking a list of new items since the last update, and handling ID remapping. This PR builds upon the refactor in #358, and replaces the `state::id` module with a new `state::store` module. This module contains all of the old code for ID remapping along with a new `Store` type that implements a store of objects by ID. The `Store` type now implements much of the boilerplate code that was previously repeated in the `state::tasks`, `state::resources`, and `state::async_ops` modules. --- tokio-console/src/state/async_ops.rs | 139 +++++++-------- tokio-console/src/state/id.rs | 118 ------------- tokio-console/src/state/mod.rs | 8 +- tokio-console/src/state/resources.rs | 210 +++++++++++------------ tokio-console/src/state/store.rs | 246 +++++++++++++++++++++++++++ tokio-console/src/state/tasks.rs | 167 +++++++++--------- 6 files changed, 493 insertions(+), 395 deletions(-) delete mode 100644 tokio-console/src/state/id.rs create mode 100644 tokio-console/src/state/store.rs diff --git a/tokio-console/src/state/async_ops.rs b/tokio-console/src/state/async_ops.rs index c0af3c894..e01879d84 100644 --- a/tokio-console/src/state/async_ops.rs +++ b/tokio-console/src/state/async_ops.rs @@ -1,9 +1,9 @@ use crate::{ intern::{self, InternedStr}, state::{ - id::{Id, Ids}, pb_duration, resources::Resource, + store::{self, Id, Store}, tasks::Task, Attribute, Field, Metadata, Visibility, }, @@ -21,9 +21,7 @@ use tui::text::Span; #[derive(Default, Debug)] pub(crate) struct AsyncOpsState { - async_ops: HashMap, Rc>>, - ids: Ids, - new_async_ops: Vec, + async_ops: Store, dropped_events: u64, } @@ -41,7 +39,7 @@ pub(crate) enum SortBy { #[derive(Debug)] pub(crate) struct AsyncOp { - num: Id, + id: Id, parent_id: InternedStr, resource_id: Id, meta_id: u64, @@ -49,7 +47,7 @@ pub(crate) struct AsyncOp { stats: AsyncOpStats, } -pub(crate) type AsyncOpRef = Weak>; +pub(crate) type AsyncOpRef = store::Ref; #[derive(Debug)] struct AsyncOpStats { @@ -76,7 +74,7 @@ impl Default for SortBy { impl SortBy { pub fn sort(&self, now: SystemTime, ops: &mut [Weak>]) { match self { - Self::Aid => ops.sort_unstable_by_key(|ao| ao.upgrade().map(|a| a.borrow().num)), + Self::Aid => ops.sort_unstable_by_key(|ao| ao.upgrade().map(|a| a.borrow().id)), Self::Task => ops.sort_unstable_by_key(|ao| ao.upgrade().map(|a| a.borrow().task_id())), Self::Source => { ops.sort_unstable_by_key(|ao| ao.upgrade().map(|a| a.borrow().source.clone())) @@ -118,7 +116,7 @@ impl view::SortBy for SortBy { impl AsyncOpsState { /// Returns any new async ops for a resource that were added since the last async ops update. pub(crate) fn take_new_async_ops(&mut self) -> impl Iterator + '_ { - self.new_async_ops.drain(..) + self.async_ops.take_new_items() } /// Returns all async ops. @@ -135,77 +133,66 @@ impl AsyncOpsState { strings: &mut intern::Strings, metas: &HashMap, update: proto::async_ops::AsyncOpUpdate, - resource_ids: &mut Ids, - task_ids: &mut Ids, + resource_ids: &mut store::Ids, + task_ids: &mut store::Ids, visibility: Visibility, ) { let mut stats_update = update.stats_update; - let new_list = &mut self.new_async_ops; - if matches!(visibility, Visibility::Show) { - new_list.clear(); - } - - let new_async_ops = update.new_async_ops.into_iter().filter_map(|async_op| { - if async_op.id.is_none() { - tracing::warn!(?async_op, "skipping async op with no id"); - } - let meta_id = match async_op.metadata.as_ref() { - Some(id) => id.id, - None => { - tracing::warn!(?async_op, "async op has no metadata ID, skipping"); - return None; - } - }; - let meta = match metas.get(&meta_id) { - Some(meta) => meta, - None => { - tracing::warn!(?async_op, meta_id, "no metadata for async op, skipping"); - return None; - } - }; - - let span_id = async_op.id?.id; - let stats = AsyncOpStats::from_proto( - stats_update.remove(&span_id)?, - meta, - styles, - strings, - task_ids, - ); - - let num = self.ids.id_for(span_id); - let resource_id = resource_ids.id_for(async_op.resource_id?.id); - let parent_id = match async_op.parent_async_op_id { - Some(id) => strings.string(format!("{}", self.ids.id_for(id.id))), - None => strings.string("n/a".to_string()), - }; - - let source = strings.string(async_op.source); - - let async_op = AsyncOp { - num, - parent_id, - resource_id, - meta_id, - source, - stats, - }; - let async_op = Rc::new(RefCell::new(async_op)); - new_list.push(Rc::downgrade(&async_op)); - Some((num, async_op)) - }); - - self.async_ops.extend(new_async_ops); - - for (span_id, stats) in stats_update { - let num = self.ids.id_for(span_id); - if let Some(async_op) = self.async_ops.get_mut(&num) { - let mut async_op = async_op.borrow_mut(); - if let Some(meta) = metas.get(&async_op.meta_id) { - async_op.stats = - AsyncOpStats::from_proto(stats, meta, styles, strings, task_ids); + self.async_ops + .insert_with(visibility, update.new_async_ops, |ids, async_op| { + if async_op.id.is_none() { + tracing::warn!(?async_op, "skipping async op with no id"); } + + let meta_id = match async_op.metadata.as_ref() { + Some(id) => id.id, + None => { + tracing::warn!(?async_op, "async op has no metadata ID, skipping"); + return None; + } + }; + let meta = match metas.get(&meta_id) { + Some(meta) => meta, + None => { + tracing::warn!(?async_op, meta_id, "no metadata for async op, skipping"); + return None; + } + }; + + let span_id = async_op.id?.id; + let stats = AsyncOpStats::from_proto( + stats_update.remove(&span_id)?, + meta, + styles, + strings, + task_ids, + ); + + let id = ids.id_for(span_id); + let resource_id = resource_ids.id_for(async_op.resource_id?.id); + let parent_id = match async_op.parent_async_op_id { + Some(id) => strings.string(format!("{}", ids.id_for(id.id))), + None => strings.string("n/a".to_string()), + }; + + let source = strings.string(async_op.source); + + let async_op = AsyncOp { + id, + parent_id, + resource_id, + meta_id, + source, + stats, + }; + Some((id, async_op)) + }); + + for (stats, mut async_op) in self.async_ops.updated(stats_update) { + if let Some(meta) = metas.get(&async_op.meta_id) { + tracing::trace!(?async_op, ?stats, "processing stats update for"); + async_op.stats = AsyncOpStats::from_proto(stats, meta, styles, strings, task_ids); } } @@ -234,7 +221,7 @@ impl AsyncOpsState { impl AsyncOp { pub(crate) fn id(&self) -> Id { - self.num + self.id } pub(crate) fn parent_id(&self) -> &str { @@ -300,7 +287,7 @@ impl AsyncOpStats { meta: &Metadata, styles: &view::Styles, strings: &mut intern::Strings, - task_ids: &mut Ids, + task_ids: &mut store::Ids, ) -> Self { let mut pb = pb; diff --git a/tokio-console/src/state/id.rs b/tokio-console/src/state/id.rs deleted file mode 100644 index a8f379118..000000000 --- a/tokio-console/src/state/id.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::{ - any, cmp, - collections::hash_map::{Entry, HashMap}, - fmt, - hash::{Hash, Hasher}, - marker::PhantomData, -}; - -pub(crate) struct Ids { - next: u64, - map: HashMap>, -} - -/// A rewritten sequential ID. -/// -/// This is distinct from the remote server's span ID, which may be reused and -/// is not sequential. -pub(crate) struct Id { - id: u64, - _ty: PhantomData, -} - -// === impl Ids === - -impl Ids { - pub(crate) fn id_for(&mut self, span_id: u64) -> Id { - match self.map.entry(span_id) { - Entry::Occupied(entry) => *entry.get(), - Entry::Vacant(entry) => { - let id = Id { - id: self.next, - _ty: PhantomData, - }; - entry.insert(id); - self.next = self.next.wrapping_add(1); - id - } - } - } -} - -impl Default for Ids { - fn default() -> Self { - Self { - next: 1, - map: Default::default(), - } - } -} - -impl fmt::Debug for Ids { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Ids") - .field("next", &self.next) - .field("map", &self.map) - .field("type", &format_args!("{}", any::type_name::())) - .finish() - } -} - -// === impl Id === - -impl Clone for Id { - #[inline] - fn clone(&self) -> Self { - Self { - id: self.id, - _ty: PhantomData, - } - } -} - -impl Copy for Id {} - -impl fmt::Debug for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let path = any::type_name::(); - let type_name = path.split("::").last().unwrap_or(path); - write!(f, "Id<{}>({})", type_name, self.id) - } -} - -impl fmt::Display for Id { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.id, f) - } -} - -impl Hash for Id { - #[inline] - fn hash(&self, state: &mut H) { - state.write_u64(self.id); - } -} - -impl PartialEq for Id { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for Id {} - -impl cmp::Ord for Id { - #[inline] - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.id.cmp(&other.id) - } -} - -impl cmp::PartialOrd for Id { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} diff --git a/tokio-console/src/state/mod.rs b/tokio-console/src/state/mod.rs index 07f404888..eb96606af 100644 --- a/tokio-console/src/state/mod.rs +++ b/tokio-console/src/state/mod.rs @@ -22,11 +22,11 @@ use tui::{ pub mod async_ops; pub mod histogram; -pub mod id; pub mod resources; +pub mod store; pub mod tasks; -pub(crate) use self::id::Id; +pub(crate) use self::store::Id; pub(crate) type DetailsRef = Rc>>; @@ -162,8 +162,8 @@ impl State { &mut self.strings, &self.metas, async_ops_update, - &mut self.resources_state.ids, - &mut self.tasks_state.ids, + self.resources_state.ids_mut(), + self.tasks_state.ids_mut(), visibility, ) } diff --git a/tokio-console/src/state/resources.rs b/tokio-console/src/state/resources.rs index 9356f03f8..5e755c76c 100644 --- a/tokio-console/src/state/resources.rs +++ b/tokio-console/src/state/resources.rs @@ -1,25 +1,22 @@ use crate::intern::{self, InternedStr}; use crate::state::{ format_location, - id::{Id, Ids}, + store::{self, Id, SpanId, Store}, Attribute, Field, Metadata, Visibility, }; use crate::view; use console_api as proto; use std::{ - cell::RefCell, collections::HashMap, convert::{TryFrom, TryInto}, - rc::{Rc, Weak}, + rc::Rc, time::{Duration, SystemTime}, }; use tui::{style::Color, text::Span}; #[derive(Default, Debug)] pub(crate) struct ResourcesState { - resources: HashMap, Rc>>, - pub(crate) ids: Ids, - new_resources: Vec, + resources: Store, dropped_events: u64, } @@ -45,11 +42,11 @@ pub(crate) struct Resource { /// /// This is NOT the `tracing::span::Id` for the resource's `tracing` span on the /// remote. - num: Id, + id: Id, /// The `tracing::span::Id` on the remote process for this resource's span. /// /// This is used when requesting a resource details stream. - span_id: u64, + span_id: SpanId, id_str: InternedStr, parent: InternedStr, parent_id: InternedStr, @@ -62,7 +59,7 @@ pub(crate) struct Resource { visibility: TypeVisibility, } -pub(crate) type ResourceRef = Weak>; +pub(crate) type ResourceRef = store::Ref; #[derive(Debug)] struct ResourceStats { @@ -79,10 +76,11 @@ impl Default for SortBy { } impl SortBy { - pub fn sort(&self, now: SystemTime, resources: &mut [Weak>]) { + pub fn sort(&self, now: SystemTime, resources: &mut [ResourceRef]) { match self { - Self::Rid => resources - .sort_unstable_by_key(|resource| resource.upgrade().map(|r| r.borrow().num)), + Self::Rid => { + resources.sort_unstable_by_key(|resource| resource.upgrade().map(|r| r.borrow().id)) + } Self::Kind => resources.sort_unstable_by_key(|resource| { resource.upgrade().map(|r| r.borrow().kind.clone()) }), @@ -120,11 +118,11 @@ impl view::SortBy for SortBy { impl ResourcesState { pub(crate) fn take_new_resources(&mut self) -> impl Iterator + '_ { - self.new_resources.drain(..) + self.resources.take_new_items() } - pub(crate) fn resource(&self, id: Id) -> Option { - self.resources.get(&id).map(Rc::downgrade) + pub(crate) fn ids_mut(&mut self) -> &mut store::Ids { + self.resources.ids_mut() } pub(crate) fn update_resources( @@ -139,109 +137,101 @@ impl ResourcesState { .new_resources .iter() .filter_map(|resource| { - let span_id = resource.parent_resource_id?.id; - let parent_id = self.ids.id_for(span_id); - let parent = self.resource(parent_id)?; - Some((parent_id, parent)) + let parent_id = resource.parent_resource_id?.id; + let parent = self.resources.get_by_span(parent_id)?; + Some((parent.borrow().id, Rc::downgrade(parent))) }) .collect(); let mut stats_update = update.stats_update; - let new_list = &mut self.new_resources; - if matches!(visibility, Visibility::Show) { - new_list.clear(); - } - - let new_resources = update.new_resources.into_iter().filter_map(|resource| { - if resource.id.is_none() { - tracing::warn!(?resource, "skipping resource with no id"); - } - - let meta_id = match resource.metadata.as_ref() { - Some(id) => id.id, - None => { - tracing::warn!(?resource, "resource has no metadata ID, skipping"); - return None; - } - }; - let meta = match metas.get(&meta_id) { - Some(meta) => meta, - None => { - tracing::warn!(?resource, meta_id, "no metadata for resource, skipping"); - return None; + self.resources + .insert_with(visibility, update.new_resources, |ids, resource| { + if resource.id.is_none() { + tracing::warn!(?resource, "skipping resource with no id"); } - }; - let kind = match kind_from_proto(resource.kind?, strings) { - Ok(kind) => kind, - Err(err) => { - tracing::warn!(%err, "resource kind cannot be parsed"); - return None; - } - }; - - let span_id = resource.id?.id; - let stats = - ResourceStats::from_proto(stats_update.remove(&span_id)?, meta, styles, strings); - - let num = self.ids.id_for(span_id); - let parent_id = resource.parent_resource_id.map(|id| self.ids.id_for(id.id)); - - let parent = strings.string(match parent_id { - Some(id) => parents - .get(&id) - .and_then(|r| r.upgrade()) - .map(|r| { - let r = r.borrow(); - format!("{} ({}::{})", r.id(), r.target(), r.concrete_type()) - }) - .unwrap_or_else(|| id.to_string()), - None => "n/a".to_string(), - }); - let parent_id = strings.string( - parent_id - .as_ref() - .map(Id::::to_string) - .unwrap_or_else(|| "n/a".to_string()), - ); - - let location = format_location(resource.location); - let visibility = if resource.is_internal { - TypeVisibility::Internal - } else { - TypeVisibility::Public - }; - - let resource = Resource { - num, - span_id, - id_str: strings.string(num.to_string()), - parent, - parent_id, - kind, - stats, - target: meta.target.clone(), - concrete_type: strings.string(resource.concrete_type), - meta_id, - location, - visibility, - }; - let resource = Rc::new(RefCell::new(resource)); - new_list.push(Rc::downgrade(&resource)); - Some((num, resource)) - }); + let meta_id = match resource.metadata.as_ref() { + Some(id) => id.id, + None => { + tracing::warn!(?resource, "resource has no metadata ID, skipping"); + return None; + } + }; + let meta = match metas.get(&meta_id) { + Some(meta) => meta, + None => { + tracing::warn!(?resource, meta_id, "no metadata for resource, skipping"); + return None; + } + }; + let kind = match kind_from_proto(resource.kind?, strings) { + Ok(kind) => kind, + Err(err) => { + tracing::warn!(%err, "resource kind cannot be parsed"); + return None; + } + }; + + let span_id = resource.id?.id; + let stats = ResourceStats::from_proto( + stats_update.remove(&span_id)?, + meta, + styles, + strings, + ); + + let id = ids.id_for(span_id); + let parent_id = resource.parent_resource_id.map(|id| ids.id_for(id.id)); + + let parent = strings.string(match parent_id { + Some(id) => parents + .get(&id) + .and_then(|r| r.upgrade()) + .map(|r| { + let r = r.borrow(); + format!("{} ({}::{})", r.id(), r.target(), r.concrete_type()) + }) + .unwrap_or_else(|| id.to_string()), + None => "n/a".to_string(), + }); + + let parent_id = strings.string( + parent_id + .as_ref() + .map(Id::::to_string) + .unwrap_or_else(|| "n/a".to_string()), + ); + + let location = format_location(resource.location); + let visibility = if resource.is_internal { + TypeVisibility::Internal + } else { + TypeVisibility::Public + }; + + let resource = Resource { + id, + span_id, + id_str: strings.string(id.to_string()), + parent, + parent_id, + kind, + stats, + target: meta.target.clone(), + concrete_type: strings.string(resource.concrete_type), + meta_id, + location, + visibility, + }; + Some((id, resource)) + }); self.dropped_events += update.dropped_events; - self.resources.extend(new_resources); - - for (span_id, stats) in stats_update { - let num = self.ids.id_for(span_id); - if let Some(resource) = self.resources.get_mut(&num) { - let mut r = resource.borrow_mut(); - if let Some(meta) = metas.get(&r.meta_id) { - r.stats = ResourceStats::from_proto(stats, meta, styles, strings); - } + for (stats, mut resource) in self.resources.updated(stats_update) { + if let Some(meta) = metas.get(&resource.meta_id) { + tracing::trace!(?resource, ?stats, "processing stats update for"); + resource.stats = ResourceStats::from_proto(stats, meta, styles, strings); } } } @@ -268,7 +258,7 @@ impl ResourcesState { impl Resource { pub(crate) fn id(&self) -> Id { - self.num + self.id } pub(crate) fn span_id(&self) -> u64 { diff --git a/tokio-console/src/state/store.rs b/tokio-console/src/state/store.rs new file mode 100644 index 000000000..79a048a5b --- /dev/null +++ b/tokio-console/src/state/store.rs @@ -0,0 +1,246 @@ +use std::{ + any, + cell::{self, RefCell}, + cmp, + collections::hash_map::{self, Entry, HashMap}, + fmt, + hash::{Hash, Hasher}, + marker::PhantomData, + rc::{Rc, Weak}, + vec, +}; + +use super::Visibility; + +/// Stores a set of items which are associated with a [`SpanId`] and a rewritten +/// sequential [`Id`]. +#[derive(Debug)] +pub(crate) struct Store { + ids: Ids, + store: HashMap, Stored>, + new_items: Vec>, +} + +pub(crate) type Ref = Weak>; +pub(crate) type Stored = Rc>; +pub(crate) type SpanId = u64; + +/// A rewritten sequential ID. +/// +/// This is distinct from the remote server's span ID, which may be reused and +/// is not sequential. +pub(crate) struct Id { + id: u64, + _ty: PhantomData, +} + +/// Stores the rewritten sequential IDs of items in a [`Store`]. +pub(crate) struct Ids { + next: u64, + map: HashMap>, +} + +// === impl Store === + +impl Store { + pub fn get(&self, id: Id) -> Option<&Stored> { + self.store.get(&id) + } + + pub fn get_by_span(&self, span_id: SpanId) -> Option<&Stored> { + let id = self.ids.map.get(&span_id)?; + self.get(*id) + } + + pub fn ids_mut(&mut self) -> &mut Ids { + &mut self.ids + } + + /// Given an iterator of `U`-typed items and a function `f` mapping a + /// `U`-typed item to a `T`-typed item and an [`Id`] for that item, inserts + /// the `T`-typed items into the store along with their IDs. + /// + /// This function has an admittedly somewhat complex signature. It would be + /// nicer if this could just be an `iter::Extend` implementation, but that + /// makes borrowing the set of [`Ids`] in the closure that's mapped over the + /// iterator challenging, because the `extend` method mutably borrows the + /// whole `Store`. + pub fn insert_with( + &mut self, + visibility: Visibility, + items: impl IntoIterator, + mut f: impl FnMut(&mut Ids, U) -> Option<(Id, T)>, + ) { + self.set_visibility(visibility); + let items = items + .into_iter() + .filter_map(|item| f(&mut self.ids, item)) + .map(|(id, item)| { + let item = Rc::new(RefCell::new(item)); + self.new_items.push(Rc::downgrade(&item)); + (id, item) + }); + self.store.extend(items); + } + + pub fn updated<'store, U, I>( + &'store mut self, + update: I, + ) -> impl Iterator)> + 'store + where + I: IntoIterator, + I::IntoIter: 'store, + { + update.into_iter().filter_map(|(span_id, update)| { + let id = self.ids.map.get(&span_id)?; + let item = self.store.get(id)?; + Some((update, item.borrow_mut())) + }) + } + + /// Applies a predicate to each element in the [`Store`], removing the item + /// if the predicate returns `false`. + pub fn retain(&mut self, f: impl FnMut(&Id, &mut Stored) -> bool) { + self.store.retain(f); + // If a removed element was in `new_items`, remove it. + self.new_items.retain(|item| item.upgrade().is_some()); + // TODO(eliza): remove from `ids` if it's no longer in `store`? + } + + /// Returns an iterator over all of the items which have been added to this + /// `Store` since the last time `take_new_items` was called. + pub fn take_new_items(&mut self) -> vec::Drain<'_, Ref> { + self.new_items.drain(..) + } + + pub fn values(&self) -> hash_map::Values<'_, Id, Stored> { + self.store.values() + } + + pub fn iter(&self) -> hash_map::Iter<'_, Id, Stored> { + self.store.iter() + } + + fn set_visibility(&mut self, visibility: Visibility) { + if matches!(visibility, Visibility::Show) { + self.new_items.clear(); + } + } +} + +impl Default for Store { + fn default() -> Self { + Self { + ids: Ids::default(), + store: HashMap::default(), + new_items: Vec::default(), + } + } +} + +impl<'store, T> IntoIterator for &'store Store { + type Item = (&'store Id, &'store Stored); + type IntoIter = hash_map::Iter<'store, Id, Stored>; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +// === impl Ids === + +impl Ids { + pub(crate) fn id_for(&mut self, span_id: SpanId) -> Id { + match self.map.entry(span_id) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let id = Id { + id: self.next, + _ty: PhantomData, + }; + entry.insert(id); + self.next = self.next.wrapping_add(1); + id + } + } + } +} + +impl Default for Ids { + fn default() -> Self { + Self { + next: 1, + map: Default::default(), + } + } +} + +impl fmt::Debug for Ids { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Ids") + .field("next", &self.next) + .field("map", &self.map) + .field("type", &format_args!("{}", any::type_name::())) + .finish() + } +} + +// === impl Id === + +impl Clone for Id { + #[inline] + fn clone(&self) -> Self { + Self { + id: self.id, + _ty: PhantomData, + } + } +} + +impl Copy for Id {} + +impl fmt::Debug for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let path = any::type_name::(); + let type_name = path.split("::").last().unwrap_or(path); + write!(f, "Id<{}>({})", type_name, self.id) + } +} + +impl fmt::Display for Id { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.id, f) + } +} + +impl Hash for Id { + #[inline] + fn hash(&self, state: &mut H) { + state.write_u64(self.id); + } +} + +impl PartialEq for Id { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Id {} + +impl cmp::Ord for Id { + #[inline] + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.id.cmp(&other.id) + } +} + +impl cmp::PartialOrd for Id { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index ccd9d71cf..8be120d8f 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -3,8 +3,9 @@ use crate::{ state::{ format_location, histogram::DurationHistogram, - id::{Id, Ids}, - pb_duration, Field, Metadata, Visibility, + pb_duration, + store::{self, Id, SpanId, Store}, + Field, Metadata, Visibility, }, util::Percentage, view, @@ -22,16 +23,14 @@ use tui::{style::Color, text::Span}; #[derive(Default, Debug)] pub(crate) struct TasksState { - tasks: HashMap, Rc>>, - pub(crate) ids: Ids, - new_tasks: Vec, + tasks: Store, pub(crate) linters: Vec>, dropped_events: u64, } #[derive(Debug, Default)] pub(crate) struct Details { - pub(crate) span_id: u64, + pub(crate) span_id: SpanId, pub(crate) poll_times_histogram: Option, } @@ -57,7 +56,7 @@ pub(crate) enum TaskState { Running, } -pub(crate) type TaskRef = Weak>; +pub(crate) type TaskRef = store::Ref; #[derive(Debug)] pub(crate) struct Task { @@ -65,11 +64,11 @@ pub(crate) struct Task { /// /// This is NOT the `tracing::span::Id` for the task's tracing span on the /// remote. - num: Id, + id: Id, /// The `tracing::span::Id` on the remote process for this task's span. /// /// This is used when requesting a task details stream. - span_id: u64, + span_id: SpanId, short_desc: InternedStr, formatted_fields: Vec>>, stats: TaskStats, @@ -109,7 +108,11 @@ struct TaskStats { impl TasksState { /// Returns any new tasks that were added since the last task update. pub(crate) fn take_new_tasks(&mut self) -> impl Iterator + '_ { - self.new_tasks.drain(..) + self.tasks.take_new_items() + } + + pub(crate) fn ids_mut(&mut self) -> &mut store::Ids { + self.tasks.ids_mut() } pub(crate) fn update_tasks( @@ -121,86 +124,76 @@ impl TasksState { visibility: Visibility, ) { let mut stats_update = update.stats_update; - let new_list = &mut self.new_tasks; - if matches!(visibility, Visibility::Show) { - new_list.clear(); - } - let linters = &self.linters; - let new_tasks = update.new_tasks.into_iter().filter_map(|mut task| { - if task.id.is_none() { - tracing::warn!(?task, "skipping task with no id"); - } - - let meta_id = match task.metadata.as_ref() { - Some(id) => id.id, - None => { - tracing::warn!(?task, "task has no metadata ID, skipping"); - return None; - } - }; - let meta = match metas.get(&meta_id) { - Some(meta) => meta, - None => { - tracing::warn!(?task, meta_id, "no metadata for task, skipping"); - return None; + self.tasks + .insert_with(visibility, update.new_tasks, |ids, mut task| { + if task.id.is_none() { + tracing::warn!(?task, "skipping task with no id"); } - }; - let mut name = None; - let mut fields = task - .fields - .drain(..) - .filter_map(|pb| { - let field = Field::from_proto(pb, meta, strings)?; - // the `task.name` field gets its own column, if it's present. - if &*field.name == Field::NAME { - name = Some(strings.string(field.value.to_string())); + + let meta_id = match task.metadata.as_ref() { + Some(id) => id.id, + None => { + tracing::warn!(?task, "task has no metadata ID, skipping"); return None; } - Some(field) - }) - .collect::>(); - - let formatted_fields = Field::make_formatted(styles, &mut fields); - let span_id = task.id?.id; - - let stats = stats_update.remove(&span_id)?.into(); - let location = format_location(task.location); - - // remap the server's ID to a pretty, sequential task ID - let num = self.ids.id_for(span_id); - - let short_desc = strings.string(match name.as_ref() { - Some(name) => format!("{} ({})", num, name), - None => format!("{}", num), + }; + let meta = match metas.get(&meta_id) { + Some(meta) => meta, + None => { + tracing::warn!(?task, meta_id, "no metadata for task, skipping"); + return None; + } + }; + let mut name = None; + let mut fields = task + .fields + .drain(..) + .filter_map(|pb| { + let field = Field::from_proto(pb, meta, strings)?; + // the `task.name` field gets its own column, if it's present. + if &*field.name == Field::NAME { + name = Some(strings.string(field.value.to_string())); + return None; + } + Some(field) + }) + .collect::>(); + + let formatted_fields = Field::make_formatted(styles, &mut fields); + let span_id = task.id?.id; + + let stats = stats_update.remove(&span_id)?.into(); + let location = format_location(task.location); + + // remap the server's ID to a pretty, sequential task ID + let id = ids.id_for(span_id); + + let short_desc = strings.string(match name.as_ref() { + Some(name) => format!("{} ({})", id, name), + None => format!("{}", id), + }); + + let mut task = Task { + name, + id, + span_id, + short_desc, + formatted_fields, + stats, + target: meta.target.clone(), + warnings: Vec::new(), + location, + }; + task.lint(linters); + Some((id, task)) }); - let mut task = Task { - name, - num, - span_id, - short_desc, - formatted_fields, - stats, - target: meta.target.clone(), - warnings: Vec::new(), - location, - }; + for (stats, mut task) in self.tasks.updated(stats_update) { + tracing::trace!(?task, ?stats, "processing stats update for"); + task.stats = stats.into(); task.lint(linters); - let task = Rc::new(RefCell::new(task)); - new_list.push(Rc::downgrade(&task)); - Some((num, task)) - }); - self.tasks.extend(new_tasks); - for (span_id, stats) in stats_update { - let num = self.ids.id_for(span_id); - if let Some(task) = self.tasks.get_mut(&num) { - let mut task = task.borrow_mut(); - tracing::trace!(?task, "processing stats update for"); - task.stats = stats.into(); - task.lint(linters); - } } self.dropped_events += update.dropped_events; @@ -225,7 +218,7 @@ impl TasksState { } pub(crate) fn task(&self, id: Id) -> Option { - self.tasks.get(&id).map(Rc::downgrade) + self.tasks.get(id).map(Rc::downgrade) } pub(crate) fn dropped_events(&self) -> u64 { @@ -234,7 +227,7 @@ impl TasksState { } impl Details { - pub(crate) fn span_id(&self) -> u64 { + pub(crate) fn span_id(&self) -> SpanId { self.span_id } @@ -245,10 +238,10 @@ impl Details { impl Task { pub(crate) fn id(&self) -> Id { - self.num + self.id } - pub(crate) fn span_id(&self) -> u64 { + pub(crate) fn span_id(&self) -> SpanId { self.span_id } @@ -433,7 +426,7 @@ impl Default for SortBy { impl SortBy { pub fn sort(&self, now: SystemTime, tasks: &mut [Weak>]) { match self { - Self::Tid => tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().num)), + Self::Tid => tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().id)), Self::Name => { tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().name.clone())) }