Skip to content

Commit

Permalink
feat(subscriber): drop data for completed tasks (console-rs#31)
Browse files Browse the repository at this point in the history
Currently, the console subscriber will store task data for tasks even
after they complete. This means that it will use an increasing amount of
memory over time. This branch updates the aggregator task to drop data
for completed spans after a configurable retention period.

I've also added a builder for configuring the console subscriber, including
changing the retention period. By default, completed tasks are retained for
an hour, so that some historical data can be played back, but this can be
reduced to decrease memory usage.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed May 27, 2021
1 parent 38adbd9 commit 94aad1c
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 26 deletions.
4 changes: 3 additions & 1 deletion console-subscriber/examples/app.rs
Expand Up @@ -3,7 +3,9 @@ use tracing_subscriber::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (layer, server) = console_subscriber::TasksLayer::new();
let (layer, server) = console_subscriber::TasksLayer::builder()
.retention(Duration::from_secs(60))
.build();
let filter =
tracing_subscriber::EnvFilter::from_default_env().add_directive("tokio=trace".parse()?);
tracing_subscriber::registry()
Expand Down
86 changes: 75 additions & 11 deletions console-subscriber/src/aggregator.rs
Expand Up @@ -22,7 +22,10 @@ pub(crate) struct Aggregator {
rpcs: mpsc::Receiver<Watch>,

/// The interval at which new data updates are pushed to clients.
flush_interval: Duration,
publish_interval: Duration,

/// How long to keep task data after a task has completed.
retention: Duration,

/// Triggers a flush when the event buffer is approaching capacity.
flush_capacity: Arc<Flush>,
Expand Down Expand Up @@ -78,15 +81,16 @@ impl Aggregator {
pub(crate) fn new(
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Watch>,
flush_interval: Duration,
builder: &crate::Builder,
) -> Self {
Self {
flush_capacity: Arc::new(Flush {
should_flush: Notify::new(),
triggered: AtomicBool::new(false),
}),
rpcs,
flush_interval,
publish_interval: builder.publish_interval,
retention: builder.retention,
events,
watchers: Vec::new(),
all_metadata: Vec::new(),
Expand All @@ -103,11 +107,11 @@ impl Aggregator {
}

pub(crate) async fn run(mut self) {
let mut flush = tokio::time::interval(self.flush_interval);
let mut publish = tokio::time::interval(self.publish_interval);
loop {
let should_send = tokio::select! {
// if the flush interval elapses, flush data to the client
_ = flush.tick() => {
_ = publish.tick() => {
true
}

Expand Down Expand Up @@ -171,10 +175,15 @@ impl Aggregator {
};
}

// flush data to clients
if should_send {
// flush data to clients, if there are any currently subscribed
// watchers and we should send a new update.
if !self.watchers.is_empty() && should_send {
self.publish();
}

// drop any tasks that have completed *and* whose final data has already
// been sent off.
self.drop_closed_tasks();
}
}

Expand Down Expand Up @@ -268,6 +277,65 @@ impl Aggregator {
}
}
}

fn drop_closed_tasks(&mut self) {
let tasks = &mut self.tasks;
let stats = &mut self.stats;
let has_watchers = !self.watchers.is_empty();
let now = SystemTime::now();
let stats_len_0 = stats.data.len();
let retention = self.retention;

// drop stats for closed tasks if they have been updated
tracing::trace!(
?self.retention,
self.has_watchers = has_watchers,
"dropping closed tasks..."
);

stats.data.retain(|id, (stats, dirty)| {
if let Some(closed) = stats.closed_at {
let closed_for = now.duration_since(closed).unwrap_or_default();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
|| closed_for > retention;
tracing::trace!(
stats.id = ?id,
stats.closed_at = ?closed,
stats.closed_for = ?closed_for,
stats.dirty = *dirty,
should_drop,
);
return !should_drop;
}

true
});
let stats_len_1 = stats.data.len();

// drop closed tasks which no longer have stats.
let tasks_len_0 = tasks.data.len();
tasks.data.retain(|id, (_, _)| stats.data.contains_key(id));
let tasks_len_1 = tasks.data.len();
let dropped_stats = stats_len_0 - stats_len_1;

if dropped_stats > 0 {
tracing::debug!(
tasks.dropped = tasks_len_0 - tasks_len_1,
tasks.len = tasks_len_1,
stats.dropped = dropped_stats,
stats.tasks = stats_len_1,
"dropped closed tasks"
);
} else {
tracing::trace!(
tasks.len = tasks_len_1,
stats.len = stats_len_1,
"no closed tasks were droppable"
);
}
}
}

// ==== impl Flush ===
Expand Down Expand Up @@ -296,10 +364,6 @@ impl<T> TaskData<T> {
Updating(self.data.entry(id).or_default())
}

// fn update(&mut self, id: &span::Id) -> Option<Updating<'_, T>> {
// Some(Updating(self.data.get_mut(id)?))
// }

fn insert(&mut self, id: span::Id, data: T) {
self.data.insert(id, (data, true));
}
Expand Down
104 changes: 104 additions & 0 deletions console-subscriber/src/builder.rs
@@ -0,0 +1,104 @@
use super::{Server, TasksLayer};
use std::{net::SocketAddr, time::Duration};

/// Builder for configuring [`TasksLayer`]s.
#[derive(Clone, Debug)]
pub struct Builder {
/// The maximum capacity for the channel of events from the subscriber to
/// the aggregator task.
pub(super) event_buffer_capacity: usize,

/// The maximum number of updates to buffer per-client before the client is
/// dropped.
pub(super) client_buffer_capacity: usize,

/// The interval between publishing updates to clients.
pub(crate) publish_interval: Duration,

/// How long to retain data for completed events.
pub(crate) retention: Duration,

/// The address on which to serve the RPC server.
pub(super) server_addr: SocketAddr,
}

impl Default for Builder {
fn default() -> Self {
Self {
event_buffer_capacity: TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY,
client_buffer_capacity: TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY,
publish_interval: TasksLayer::DEFAULT_PUBLISH_INTERVAL,
retention: TasksLayer::DEFAULT_RETENTION,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
}
}
}

impl Builder {
/// Sets the maximum capacity for the channel of events sent from subscriber
/// layers to the aggregator task.
///
/// When this channel is at capacity, additional events will be dropped.
///
/// By default, this is [`TasksLayer::DEFAULT_EVENT_BUFFER_CAPACITY`].
pub fn event_buffer_capacity(self, event_buffer_capacity: usize) -> Self {
Self {
event_buffer_capacity,
..self
}
}

/// Sets the maximum capacity of updates to buffer for each subscribed
/// client, if that client is not reading from the RPC stream.
///
/// When this channel is at capacity, the client may be disconnected.
///
/// By default, this is [`TasksLayer::DEFAULT_CLIENT_BUFFER_CAPACITY`].
pub fn client_buffer_capacity(self, client_buffer_capacity: usize) -> Self {
Self {
client_buffer_capacity,
..self
}
}

/// Sets how frequently updates are published to clients.
///
/// A shorter duration will allow clients to update more frequently, but may
/// result in the program spending more time preparing task data updates.
///
/// By default, this is [`TasksLayer::DEFAULT_PUBLISH_INTERVAL`].
pub fn publish_interval(self, publish_interval: Duration) -> Self {
Self {
publish_interval,
..self
}
}

/// Sets how long data is retained for completed tasks.
///
/// A longer duration will allow more historical data to be replayed by
/// clients, but will result in increased memory usage. A shorter duration
/// will reduce memory usage, but less historical data from completed tasks
/// will be retained.
///
/// By default, this is [`TasksLayer::DEFAULT_RETENTION`].
pub fn retention(self, retention: Duration) -> Self {
Self { retention, ..self }
}

/// Sets the socket address on which to serve the RPC server.
///
/// By default, the server is bound on the IP address [`Server::DEFAULT_IP`]
/// on port [`Server::DEFAULT_PORT`].
pub fn server_addr(self, server_addr: impl Into<SocketAddr>) -> Self {
Self {
server_addr: server_addr.into(),
..self
}
}

/// Completes the builder, returning a [`TasksLayer`] and [`Server`] task.
pub fn build(self) -> (TasksLayer, Server) {
TasksLayer::build(self)
}
}
45 changes: 31 additions & 14 deletions console-subscriber/src/lib.rs
Expand Up @@ -2,7 +2,7 @@ use console_api as proto;
use tokio::sync::mpsc;

use std::{
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr},
ptr,
sync::{
atomic::{AtomicPtr, Ordering::*},
Expand All @@ -20,6 +20,8 @@ use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

mod aggregator;
use aggregator::Aggregator;
mod builder;
pub use builder::Builder;

pub struct TasksLayer {
task_meta: AtomicPtr<Metadata<'static>>,
Expand Down Expand Up @@ -66,25 +68,41 @@ enum Event {

impl TasksLayer {
pub fn new() -> (Self, Server) {
Self::builder().build()
}

/// Returns a [`Builder`] for configuring a `TasksLayer`.
pub fn builder() -> Builder {
Builder::default()
}

fn build(config: Builder) -> (Self, Server) {
// The `cfg` value *appears* to be a constant to clippy, but it changes
// depending on the build-time configuration...
#![allow(clippy::assertions_on_constants)]
assert!(
cfg!(tokio_unstable),
"task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
);
// TODO(eliza): builder
let (tx, events) = mpsc::channel(Self::DEFAULT_EVENT_BUFFER_CAPACITY);
tracing::debug!(
config.event_buffer_capacity,
config.client_buffer_capacity,
?config.publish_interval,
?config.retention,
?config.server_addr,
"configured console subscriber"
);

let (tx, events) = mpsc::channel(config.event_buffer_capacity);
let (subscribe, rpcs) = mpsc::channel(256);

let aggregator = Aggregator::new(events, rpcs, Self::DEFAULT_FLUSH_INTERVAL);
let aggregator = Aggregator::new(events, rpcs, &config);
let flush = aggregator.flush().clone();
let addr = SocketAddr::from(([127, 0, 0, 1], 6669));
let server = Server {
aggregator: Some(aggregator),
addr,
addr: config.server_addr,
subscribe,
client_buffer: Self::DEFAULT_CLIENT_BUFFER_CAPACITY,
client_buffer: config.client_buffer_capacity,
};
let layer = Self {
tx,
Expand All @@ -99,8 +117,10 @@ impl TasksLayer {
impl TasksLayer {
pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 10;
pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
pub const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);

/// By default, completed spans are retained for one hour.
pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
// how much capacity should remain in the buffer before triggering a
// flush on capacity?
//
Expand Down Expand Up @@ -238,12 +258,9 @@ where
}

impl Server {
pub fn with_addr(self, addr: impl Into<SocketAddr>) -> Self {
Self {
addr: addr.into(),
..self
}
}
// XXX(eliza): why is `SocketAddr::new` not `const`???
pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub const DEFAULT_PORT: u16 = 6669;

pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
self.serve_with(tonic::transport::Server::default()).await
Expand Down

0 comments on commit 94aad1c

Please sign in to comment.