From 94aad1c88e9f97e08ef513449e1399092187da21 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 27 May 2021 11:35:33 -0700 Subject: [PATCH] feat(subscriber): drop data for completed tasks (#31) Currently, the console subscriber will store task data for tasks even after they complete. This means that it will use an increasing amount of memory over time. This branch updates the aggregator task to drop data for completed spans after a configurable retention period. I've also added a builder for configuring the console subscriber, including changing the retention period. By default, completed tasks are retained for an hour, so that some historical data can be played back, but this can be reduced to decrease memory usage. Signed-off-by: Eliza Weisman --- console-subscriber/examples/app.rs | 4 +- console-subscriber/src/aggregator.rs | 86 +++++++++++++++++++--- console-subscriber/src/builder.rs | 104 +++++++++++++++++++++++++++ console-subscriber/src/lib.rs | 45 ++++++++---- 4 files changed, 213 insertions(+), 26 deletions(-) create mode 100644 console-subscriber/src/builder.rs diff --git a/console-subscriber/examples/app.rs b/console-subscriber/examples/app.rs index 047542c1..92f4147e 100644 --- a/console-subscriber/examples/app.rs +++ b/console-subscriber/examples/app.rs @@ -3,7 +3,9 @@ use tracing_subscriber::prelude::*; #[tokio::main] async fn main() -> Result<(), Box> { - let (layer, server) = console_subscriber::TasksLayer::new(); + let (layer, server) = console_subscriber::TasksLayer::builder() + .retention(Duration::from_secs(60)) + .build(); let filter = tracing_subscriber::EnvFilter::from_default_env().add_directive("tokio=trace".parse()?); tracing_subscriber::registry() diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 4b329a15..6421b28b 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -22,7 +22,10 @@ pub(crate) struct Aggregator { rpcs: mpsc::Receiver, /// The interval at which new data updates are pushed to clients. - flush_interval: Duration, + publish_interval: Duration, + + /// How long to keep task data after a task has completed. + retention: Duration, /// Triggers a flush when the event buffer is approaching capacity. flush_capacity: Arc, @@ -78,7 +81,7 @@ impl Aggregator { pub(crate) fn new( events: mpsc::Receiver, rpcs: mpsc::Receiver, - flush_interval: Duration, + builder: &crate::Builder, ) -> Self { Self { flush_capacity: Arc::new(Flush { @@ -86,7 +89,8 @@ impl Aggregator { triggered: AtomicBool::new(false), }), rpcs, - flush_interval, + publish_interval: builder.publish_interval, + retention: builder.retention, events, watchers: Vec::new(), all_metadata: Vec::new(), @@ -103,11 +107,11 @@ impl Aggregator { } pub(crate) async fn run(mut self) { - let mut flush = tokio::time::interval(self.flush_interval); + let mut publish = tokio::time::interval(self.publish_interval); loop { let should_send = tokio::select! { // if the flush interval elapses, flush data to the client - _ = flush.tick() => { + _ = publish.tick() => { true } @@ -171,10 +175,15 @@ impl Aggregator { }; } - // flush data to clients - if should_send { + // flush data to clients, if there are any currently subscribed + // watchers and we should send a new update. + if !self.watchers.is_empty() && should_send { self.publish(); } + + // drop any tasks that have completed *and* whose final data has already + // been sent off. + self.drop_closed_tasks(); } } @@ -268,6 +277,65 @@ impl Aggregator { } } } + + fn drop_closed_tasks(&mut self) { + let tasks = &mut self.tasks; + let stats = &mut self.stats; + let has_watchers = !self.watchers.is_empty(); + let now = SystemTime::now(); + let stats_len_0 = stats.data.len(); + let retention = self.retention; + + // drop stats for closed tasks if they have been updated + tracing::trace!( + ?self.retention, + self.has_watchers = has_watchers, + "dropping closed tasks..." + ); + + stats.data.retain(|id, (stats, dirty)| { + if let Some(closed) = stats.closed_at { + let closed_for = now.duration_since(closed).unwrap_or_default(); + let should_drop = + // if there are any clients watching, retain all dirty tasks regardless of age + (*dirty && has_watchers) + || closed_for > retention; + tracing::trace!( + stats.id = ?id, + stats.closed_at = ?closed, + stats.closed_for = ?closed_for, + stats.dirty = *dirty, + should_drop, + ); + return !should_drop; + } + + true + }); + let stats_len_1 = stats.data.len(); + + // drop closed tasks which no longer have stats. + let tasks_len_0 = tasks.data.len(); + tasks.data.retain(|id, (_, _)| stats.data.contains_key(id)); + let tasks_len_1 = tasks.data.len(); + let dropped_stats = stats_len_0 - stats_len_1; + + if dropped_stats > 0 { + tracing::debug!( + tasks.dropped = tasks_len_0 - tasks_len_1, + tasks.len = tasks_len_1, + stats.dropped = dropped_stats, + stats.tasks = stats_len_1, + "dropped closed tasks" + ); + } else { + tracing::trace!( + tasks.len = tasks_len_1, + stats.len = stats_len_1, + "no closed tasks were droppable" + ); + } + } } // ==== impl Flush === @@ -296,10 +364,6 @@ impl TaskData { Updating(self.data.entry(id).or_default()) } - // fn update(&mut self, id: &span::Id) -> Option> { - // Some(Updating(self.data.get_mut(id)?)) - // } - fn insert(&mut self, id: span::Id, data: T) { self.data.insert(id, (data, true)); } diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs new file mode 100644 index 00000000..95c90e33 --- /dev/null +++ b/console-subscriber/src/builder.rs @@ -0,0 +1,104 @@ +use super::{Server, TasksLayer}; +use std::{net::SocketAddr, time::Duration}; + +/// Builder for configuring [`TasksLayer`]s. +#[derive(Clone, Debug)] +pub struct Builder { + /// The maximum capacity for the channel of events from the subscriber to + /// the aggregator task. + pub(super) event_buffer_capacity: usize, + + /// The maximum number of updates to buffer per-client before the client is + /// dropped. + pub(super) client_buffer_capacity: usize, + + /// The interval between publishing updates to clients. + pub(crate) publish_interval: Duration, + + /// How long to retain data for completed events. + pub(crate) retention: Duration, + + /// The address on which to serve the RPC server. + pub(super) server_addr: SocketAddr, +} + +impl Default for Builder { + fn default() -> Self { + Self { + event_buffer_capacity: TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY, + client_buffer_capacity: TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY, + publish_interval: TasksLayer::DEFAULT_PUBLISH_INTERVAL, + retention: TasksLayer::DEFAULT_RETENTION, + server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT), + } + } +} + +impl Builder { + /// Sets the maximum capacity for the channel of events sent from subscriber + /// layers to the aggregator task. + /// + /// When this channel is at capacity, additional events will be dropped. + /// + /// By default, this is [`TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY`]. + pub fn event_buffer_capacity(self, event_buffer_capacity: usize) -> Self { + Self { + event_buffer_capacity, + ..self + } + } + + /// Sets the maximum capacity of updates to buffer for each subscribed + /// client, if that client is not reading from the RPC stream. + /// + /// When this channel is at capacity, the client may be disconnected. + /// + /// By default, this is [`TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY`]. + pub fn client_buffer_capacity(self, client_buffer_capacity: usize) -> Self { + Self { + client_buffer_capacity, + ..self + } + } + + /// Sets how frequently updates are published to clients. + /// + /// A shorter duration will allow clients to update more frequently, but may + /// result in the program spending more time preparing task data updates. + /// + /// By default, this is [`TasksLayer::DEFAULT_PUBLISH_INTERVAL`]. + pub fn publish_interval(self, publish_interval: Duration) -> Self { + Self { + publish_interval, + ..self + } + } + + /// Sets how long data is retained for completed tasks. + /// + /// A longer duration will allow more historical data to be replayed by + /// clients, but will result in increased memory usage. A shorter duration + /// will reduce memory usage, but less historical data from completed tasks + /// will be retained. + /// + /// By default, this is [`TasksLayer::DEFAULT_RETENTION`]. + pub fn retention(self, retention: Duration) -> Self { + Self { retention, ..self } + } + + /// Sets the socket address on which to serve the RPC server. + /// + /// By default, the server is bound on the IP address [`Server::DEFAULT_IP`] + /// on port [`Server::DEFAULT_PORT`]. + pub fn server_addr(self, server_addr: impl Into) -> Self { + Self { + server_addr: server_addr.into(), + ..self + } + } + + /// Completes the builder, returning a [`TasksLayer`] and [`Server`] task. + pub fn build(self) -> (TasksLayer, Server) { + TasksLayer::build(self) + } +} diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index e97700e2..4cedea72 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -2,7 +2,7 @@ use console_api as proto; use tokio::sync::mpsc; use std::{ - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, ptr, sync::{ atomic::{AtomicPtr, Ordering::*}, @@ -20,6 +20,8 @@ use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; mod aggregator; use aggregator::Aggregator; +mod builder; +pub use builder::Builder; pub struct TasksLayer { task_meta: AtomicPtr>, @@ -66,6 +68,15 @@ enum Event { impl TasksLayer { pub fn new() -> (Self, Server) { + Self::builder().build() + } + + /// Returns a [`Builder`] for configuring a `TasksLayer`. + pub fn builder() -> Builder { + Builder::default() + } + + fn build(config: Builder) -> (Self, Server) { // The `cfg` value *appears* to be a constant to clippy, but it changes // depending on the build-time configuration... #![allow(clippy::assertions_on_constants)] @@ -73,18 +84,25 @@ impl TasksLayer { cfg!(tokio_unstable), "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!" ); - // TODO(eliza): builder - let (tx, events) = mpsc::channel(Self::DEFAULT_EVENT_BUFFER_CAPACITY); + tracing::debug!( + config.event_buffer_capacity, + config.client_buffer_capacity, + ?config.publish_interval, + ?config.retention, + ?config.server_addr, + "configured console subscriber" + ); + + let (tx, events) = mpsc::channel(config.event_buffer_capacity); let (subscribe, rpcs) = mpsc::channel(256); - let aggregator = Aggregator::new(events, rpcs, Self::DEFAULT_FLUSH_INTERVAL); + let aggregator = Aggregator::new(events, rpcs, &config); let flush = aggregator.flush().clone(); - let addr = SocketAddr::from(([127, 0, 0, 1], 6669)); let server = Server { aggregator: Some(aggregator), - addr, + addr: config.server_addr, subscribe, - client_buffer: Self::DEFAULT_CLIENT_BUFFER_CAPACITY, + client_buffer: config.client_buffer_capacity, }; let layer = Self { tx, @@ -99,8 +117,10 @@ impl TasksLayer { impl TasksLayer { pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 10; pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4; - pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(1); + pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1); + /// By default, completed spans are retained for one hour. + pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60); // how much capacity should remain in the buffer before triggering a // flush on capacity? // @@ -238,12 +258,9 @@ where } impl Server { - pub fn with_addr(self, addr: impl Into) -> Self { - Self { - addr: addr.into(), - ..self - } - } + // XXX(eliza): why is `SocketAddr::new` not `const`??? + pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + pub const DEFAULT_PORT: u16 = 6669; pub async fn serve(self) -> Result<(), Box> { self.serve_with(tonic::transport::Server::default()).await