diff --git a/console-subscriber/examples/app.rs b/console-subscriber/examples/app.rs index 047542c1..92f4147e 100644 --- a/console-subscriber/examples/app.rs +++ b/console-subscriber/examples/app.rs @@ -3,7 +3,9 @@ use tracing_subscriber::prelude::*; #[tokio::main] async fn main() -> Result<(), Box> { - let (layer, server) = console_subscriber::TasksLayer::new(); + let (layer, server) = console_subscriber::TasksLayer::builder() + .retention(Duration::from_secs(60)) + .build(); let filter = tracing_subscriber::EnvFilter::from_default_env().add_directive("tokio=trace".parse()?); tracing_subscriber::registry() diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 4b329a15..6421b28b 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -22,7 +22,10 @@ pub(crate) struct Aggregator { rpcs: mpsc::Receiver, /// The interval at which new data updates are pushed to clients. - flush_interval: Duration, + publish_interval: Duration, + + /// How long to keep task data after a task has completed. + retention: Duration, /// Triggers a flush when the event buffer is approaching capacity. flush_capacity: Arc, @@ -78,7 +81,7 @@ impl Aggregator { pub(crate) fn new( events: mpsc::Receiver, rpcs: mpsc::Receiver, - flush_interval: Duration, + builder: &crate::Builder, ) -> Self { Self { flush_capacity: Arc::new(Flush { @@ -86,7 +89,8 @@ impl Aggregator { triggered: AtomicBool::new(false), }), rpcs, - flush_interval, + publish_interval: builder.publish_interval, + retention: builder.retention, events, watchers: Vec::new(), all_metadata: Vec::new(), @@ -103,11 +107,11 @@ impl Aggregator { } pub(crate) async fn run(mut self) { - let mut flush = tokio::time::interval(self.flush_interval); + let mut publish = tokio::time::interval(self.publish_interval); loop { let should_send = tokio::select! { // if the flush interval elapses, flush data to the client - _ = flush.tick() => { + _ = publish.tick() => { true } @@ -171,10 +175,15 @@ impl Aggregator { }; } - // flush data to clients - if should_send { + // flush data to clients, if there are any currently subscribed + // watchers and we should send a new update. + if !self.watchers.is_empty() && should_send { self.publish(); } + + // drop any tasks that have completed *and* whose final data has already + // been sent off. + self.drop_closed_tasks(); } } @@ -268,6 +277,65 @@ impl Aggregator { } } } + + fn drop_closed_tasks(&mut self) { + let tasks = &mut self.tasks; + let stats = &mut self.stats; + let has_watchers = !self.watchers.is_empty(); + let now = SystemTime::now(); + let stats_len_0 = stats.data.len(); + let retention = self.retention; + + // drop stats for closed tasks if they have been updated + tracing::trace!( + ?self.retention, + self.has_watchers = has_watchers, + "dropping closed tasks..." + ); + + stats.data.retain(|id, (stats, dirty)| { + if let Some(closed) = stats.closed_at { + let closed_for = now.duration_since(closed).unwrap_or_default(); + let should_drop = + // if there are any clients watching, retain all dirty tasks regardless of age + (*dirty && has_watchers) + || closed_for > retention; + tracing::trace!( + stats.id = ?id, + stats.closed_at = ?closed, + stats.closed_for = ?closed_for, + stats.dirty = *dirty, + should_drop, + ); + return !should_drop; + } + + true + }); + let stats_len_1 = stats.data.len(); + + // drop closed tasks which no longer have stats. + let tasks_len_0 = tasks.data.len(); + tasks.data.retain(|id, (_, _)| stats.data.contains_key(id)); + let tasks_len_1 = tasks.data.len(); + let dropped_stats = stats_len_0 - stats_len_1; + + if dropped_stats > 0 { + tracing::debug!( + tasks.dropped = tasks_len_0 - tasks_len_1, + tasks.len = tasks_len_1, + stats.dropped = dropped_stats, + stats.tasks = stats_len_1, + "dropped closed tasks" + ); + } else { + tracing::trace!( + tasks.len = tasks_len_1, + stats.len = stats_len_1, + "no closed tasks were droppable" + ); + } + } } // ==== impl Flush === @@ -296,10 +364,6 @@ impl TaskData { Updating(self.data.entry(id).or_default()) } - // fn update(&mut self, id: &span::Id) -> Option> { - // Some(Updating(self.data.get_mut(id)?)) - // } - fn insert(&mut self, id: span::Id, data: T) { self.data.insert(id, (data, true)); } diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs new file mode 100644 index 00000000..95c90e33 --- /dev/null +++ b/console-subscriber/src/builder.rs @@ -0,0 +1,104 @@ +use super::{Server, TasksLayer}; +use std::{net::SocketAddr, time::Duration}; + +/// Builder for configuring [`TasksLayer`]s. +#[derive(Clone, Debug)] +pub struct Builder { + /// The maximum capacity for the channel of events from the subscriber to + /// the aggregator task. + pub(super) event_buffer_capacity: usize, + + /// The maximum number of updates to buffer per-client before the client is + /// dropped. + pub(super) client_buffer_capacity: usize, + + /// The interval between publishing updates to clients. + pub(crate) publish_interval: Duration, + + /// How long to retain data for completed events. + pub(crate) retention: Duration, + + /// The address on which to serve the RPC server. + pub(super) server_addr: SocketAddr, +} + +impl Default for Builder { + fn default() -> Self { + Self { + event_buffer_capacity: TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY, + client_buffer_capacity: TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY, + publish_interval: TasksLayer::DEFAULT_PUBLISH_INTERVAL, + retention: TasksLayer::DEFAULT_RETENTION, + server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT), + } + } +} + +impl Builder { + /// Sets the maximum capacity for the channel of events sent from subscriber + /// layers to the aggregator task. + /// + /// When this channel is at capacity, additional events will be dropped. + /// + /// By default, this is [`TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY`]. + pub fn event_buffer_capacity(self, event_buffer_capacity: usize) -> Self { + Self { + event_buffer_capacity, + ..self + } + } + + /// Sets the maximum capacity of updates to buffer for each subscribed + /// client, if that client is not reading from the RPC stream. + /// + /// When this channel is at capacity, the client may be disconnected. + /// + /// By default, this is [`TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY`]. + pub fn client_buffer_capacity(self, client_buffer_capacity: usize) -> Self { + Self { + client_buffer_capacity, + ..self + } + } + + /// Sets how frequently updates are published to clients. + /// + /// A shorter duration will allow clients to update more frequently, but may + /// result in the program spending more time preparing task data updates. + /// + /// By default, this is [`TasksLayer::DEFAULT_PUBLISH_INTERVAL`]. + pub fn publish_interval(self, publish_interval: Duration) -> Self { + Self { + publish_interval, + ..self + } + } + + /// Sets how long data is retained for completed tasks. + /// + /// A longer duration will allow more historical data to be replayed by + /// clients, but will result in increased memory usage. A shorter duration + /// will reduce memory usage, but less historical data from completed tasks + /// will be retained. + /// + /// By default, this is [`TasksLayer::DEFAULT_RETENTION`]. + pub fn retention(self, retention: Duration) -> Self { + Self { retention, ..self } + } + + /// Sets the socket address on which to serve the RPC server. + /// + /// By default, the server is bound on the IP address [`Server::DEFAULT_IP`] + /// on port [`Server::DEFAULT_PORT`]. + pub fn server_addr(self, server_addr: impl Into) -> Self { + Self { + server_addr: server_addr.into(), + ..self + } + } + + /// Completes the builder, returning a [`TasksLayer`] and [`Server`] task. + pub fn build(self) -> (TasksLayer, Server) { + TasksLayer::build(self) + } +} diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index e97700e2..4cedea72 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -2,7 +2,7 @@ use console_api as proto; use tokio::sync::mpsc; use std::{ - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, ptr, sync::{ atomic::{AtomicPtr, Ordering::*}, @@ -20,6 +20,8 @@ use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; mod aggregator; use aggregator::Aggregator; +mod builder; +pub use builder::Builder; pub struct TasksLayer { task_meta: AtomicPtr>, @@ -66,6 +68,15 @@ enum Event { impl TasksLayer { pub fn new() -> (Self, Server) { + Self::builder().build() + } + + /// Returns a [`Builder`] for configuring a `TasksLayer`. + pub fn builder() -> Builder { + Builder::default() + } + + fn build(config: Builder) -> (Self, Server) { // The `cfg` value *appears* to be a constant to clippy, but it changes // depending on the build-time configuration... #![allow(clippy::assertions_on_constants)] @@ -73,18 +84,25 @@ impl TasksLayer { cfg!(tokio_unstable), "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!" ); - // TODO(eliza): builder - let (tx, events) = mpsc::channel(Self::DEFAULT_EVENT_BUFFER_CAPACITY); + tracing::debug!( + config.event_buffer_capacity, + config.client_buffer_capacity, + ?config.publish_interval, + ?config.retention, + ?config.server_addr, + "configured console subscriber" + ); + + let (tx, events) = mpsc::channel(config.event_buffer_capacity); let (subscribe, rpcs) = mpsc::channel(256); - let aggregator = Aggregator::new(events, rpcs, Self::DEFAULT_FLUSH_INTERVAL); + let aggregator = Aggregator::new(events, rpcs, &config); let flush = aggregator.flush().clone(); - let addr = SocketAddr::from(([127, 0, 0, 1], 6669)); let server = Server { aggregator: Some(aggregator), - addr, + addr: config.server_addr, subscribe, - client_buffer: Self::DEFAULT_CLIENT_BUFFER_CAPACITY, + client_buffer: config.client_buffer_capacity, }; let layer = Self { tx, @@ -99,8 +117,10 @@ impl TasksLayer { impl TasksLayer { pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 10; pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4; - pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(1); + pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1); + /// By default, completed spans are retained for one hour. + pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60); // how much capacity should remain in the buffer before triggering a // flush on capacity? // @@ -238,12 +258,9 @@ where } impl Server { - pub fn with_addr(self, addr: impl Into) -> Self { - Self { - addr: addr.into(), - ..self - } - } + // XXX(eliza): why is `SocketAddr::new` not `const`??? + pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + pub const DEFAULT_PORT: u16 = 6669; pub async fn serve(self) -> Result<(), Box> { self.serve_with(tonic::transport::Server::default()).await