Skip to content

Commit

Permalink
feat: tracing for structured, context-aware logs (bytebeamio#499)
Browse files Browse the repository at this point in the history
* feat: use tracing crate instead of log

* tracing subscriber in progress

* added client_idx as span field and use FILTER env var

* manually create spans

* initial POC for reloading filter

* feat: dynamically reloading filter for logs

* feat: filter the log with rumqttd as target by default

* feat: consume span and removed unnecessary fields

* return text instead of json if logs filter reloaded

* feat: removed count in run span and created span for server

* feat: tracing

* feat: pretty logs

* feat: use instrument combinator for attaching span to futures

* feat: stop logging run

* fix: merge issues

* fix: clippy warning

* feat: removed unnecessary client_id from log

Co-authored-by: Devdutt Shenoi <devdutt@bytebeam.io>
  • Loading branch information
swanandx and de-sh committed Nov 12, 2022
1 parent cc855aa commit fa355dd
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 227 deletions.
124 changes: 103 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rumqttd/Cargo.toml
Expand Up @@ -16,7 +16,6 @@ serde_json = "1.0.66"
bytes = { version = "1", features = ["serde"] }
flume = "0.10.9"
slab = "0.4.3"
log = "0.4.14"
thiserror = "1.0.24"
tokio-util = { version = "0.7", features = ["codec"], optional = true }
tokio-rustls = { version = "0.23.0", optional = true }
Expand All @@ -29,8 +28,9 @@ rouille = "3.1.1"
futures-util = { version = "0.3.16", optional = true}
parking_lot = "0.11.2"
config = "0.13"
simplelog = "0.12.0"
structopt = "0.3.26"
tracing = { version="0.1", features=["log"] }
tracing-subscriber = { version="0.3.16", features=["env-filter"] }

[features]
default = ["use-rustls"]
Expand Down
26 changes: 20 additions & 6 deletions rumqttd/src/lib.rs
@@ -1,14 +1,18 @@
#[macro_use]
extern crate log;

#[macro_use]
extern crate rouille;

use std::collections::HashMap;
use std::path::PathBuf;

use segments::Storage;
use serde::{Deserialize, Serialize};
use tracing_subscriber::{
filter::EnvFilter,
fmt::{
format::{Format, Pretty},
Layer,
},
layer::Layered,
reload::Handle,
Registry,
};

use std::net::SocketAddr;

Expand Down Expand Up @@ -99,9 +103,19 @@ pub struct RouterConfig {
pub initialized_filters: Option<Vec<Filter>>,
}

type ReloadHandle = Handle<EnvFilter, Layered<Layer<Registry, Pretty, Format<Pretty>>, Registry>>;

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ConsoleSettings {
pub listen: String,
#[serde(skip)]
filter_handle: Option<ReloadHandle>,
}

impl ConsoleSettings {
pub fn set_filter_reload_handle(&mut self, handle: ReloadHandle) {
self.filter_handle.replace(handle);
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
11 changes: 11 additions & 0 deletions rumqttd/src/link/console.rs
Expand Up @@ -2,6 +2,7 @@ use crate::link::local::{Link, LinkRx};
use crate::router::{Event, MetricsRequest};
use crate::{ConnectionId, ConsoleSettings};
use flume::Sender;
use rouille::{router, try_or_400};
use std::sync::Arc;

pub struct ConsoleLink {
Expand Down Expand Up @@ -99,6 +100,16 @@ pub fn start(console: Arc<ConsoleLink>) {

let v = console.link_rx.metrics();
rouille::Response::json(&v)
},
(POST) (/logs) => {
let data = try_or_400!(rouille::input::plain_text_body(request));
if let Some(handle) = &console.config.filter_handle {
if handle.reload(&data).is_err() {
return rouille::Response::empty_400();
}
return rouille::Response::text(data);
}
rouille::Response::empty_404()
},
_ => rouille::Response::empty_404()
)
Expand Down
3 changes: 2 additions & 1 deletion rumqttd/src/link/remote.rs
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::time::error::Elapsed;
use tokio::{select, time};
use tracing::debug;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -128,7 +129,7 @@ impl<P: Protocol> RemoteLink<P> {
buffer.len()
};

debug!("{:15.15}[I] {:20} buffercount = {}", self.client_id, "packets", len);
debug!(buffercount=len, "packets");
self.link_tx.notify().await?;
}
// Receive from router when previous when state isn't in collision
Expand Down

0 comments on commit fa355dd

Please sign in to comment.