From 51fb5230cbb922f2dc7370868649979bbb3e58cc Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 1 Nov 2022 12:04:16 +0530 Subject: [PATCH 01/26] feat: use tracing crate instead of log --- Cargo.lock | 14 +++++++++++++- rumqttd/Cargo.toml | 4 ++-- rumqttd/src/lib.rs | 3 --- rumqttd/src/link/remote.rs | 1 + rumqttd/src/main.rs | 1 + rumqttd/src/router/iobufs.rs | 1 + rumqttd/src/router/logs.rs | 1 + rumqttd/src/router/routing.rs | 2 +- rumqttd/src/router/scheduler.rs | 1 + rumqttd/src/segments/mod.rs | 2 +- rumqttd/src/server/broker.rs | 2 +- 11 files changed, 23 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52d2699de..b4588c580 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2018,7 +2018,6 @@ dependencies = [ "flume", "futures-util", "jemallocator", - "log", "parking_lot 0.11.2", "pretty_assertions 1.3.0", "pretty_env_logger", @@ -2035,6 +2034,7 @@ dependencies = [ "tokio-rustls", "tokio-tungstenite 0.15.0", "tokio-util", + "tracing", "vergen", "websocket-codec", ] @@ -2717,9 +2717,21 @@ dependencies = [ "cfg-if 1.0.0", "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.29" diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 312007e68..0faa7878b 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -16,7 +16,6 @@ serde_json = "1.0.66" bytes = { version = "1", features = ["serde"] } flume = "0.10.9" slab = "0.4.3" -log = "0.4.14" thiserror = "1.0.24" tokio-util = { version = "0.7", features = ["codec"], optional = true } tokio-rustls = { version = "0.23.0", optional = true } @@ -29,8 +28,9 @@ rouille = "3.1.1" futures-util = { version = "0.3.16", optional = true} parking_lot = "0.11.2" config = "0.13" -simplelog = "0.12.0" structopt = "0.3.26" +tracing = { version="0.1", features=["log"] } +simplelog = "0.12" [features] default = ["use-rustls"] diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 04990b2c5..336793cc8 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -1,6 +1,3 @@ -#[macro_use] -extern crate log; - #[macro_use] extern crate rouille; diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index f9b68d9a1..81430f3ee 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -6,6 +6,7 @@ use crate::router::{Event, Notification}; use crate::{ConnectionId, ConnectionSettings, Link}; use flume::{RecvError, SendError, Sender, TrySendError}; +use tracing::debug; use std::collections::VecDeque; use std::io; use std::sync::Arc; diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index a2c99743f..c79f2edac 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -4,6 +4,7 @@ use simplelog::{ Color, ColorChoice, CombinedLogger, Level, LevelFilter, LevelPadding, TargetPadding, TermLogger, TerminalMode, ThreadPadding, }; + use structopt::StructOpt; #[derive(StructOpt)] diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 671a3cba7..40342a78a 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -2,6 +2,7 @@ use std::{collections::VecDeque, sync::Arc, time::Instant}; use flume::{Receiver, Sender}; use parking_lot::Mutex; +use tracing::error; use crate::{ protocol::Packet, diff --git a/rumqttd/src/router/logs.rs b/rumqttd/src/router/logs.rs index 1f81005a5..8845a825d 100644 --- a/rumqttd/src/router/logs.rs +++ b/rumqttd/src/router/logs.rs @@ -1,5 +1,6 @@ use super::Ack; use slab::Slab; +use tracing::trace; use crate::protocol::{ matches, ConnAck, PingResp, PubAck, PubComp, PubRec, PubRel, Publish, SubAck, UnsubAck, diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index cc3c9b804..f1d5b298b 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -9,8 +9,8 @@ use crate::router::Forward; use crate::segments::Position; use crate::*; use flume::{bounded, Receiver, RecvError, Sender, TryRecvError}; -use log::*; use slab::Slab; +use tracing::{error, info, debug, trace, warn}; use std::collections::{HashMap, HashSet, VecDeque}; use std::str::Utf8Error; use std::thread; diff --git a/rumqttd/src/router/scheduler.rs b/rumqttd/src/router/scheduler.rs index f7082da59..0b571d644 100644 --- a/rumqttd/src/router/scheduler.rs +++ b/rumqttd/src/router/scheduler.rs @@ -5,6 +5,7 @@ use std::{ use serde::{Deserialize, Serialize}; use slab::Slab; +use tracing::trace; use super::DataRequest; use crate::{ConnectionId, Filter}; diff --git a/rumqttd/src/segments/mod.rs b/rumqttd/src/segments/mod.rs index bfc82400c..17126c3f3 100644 --- a/rumqttd/src/segments/mod.rs +++ b/rumqttd/src/segments/mod.rs @@ -1,4 +1,3 @@ -use log::warn; use std::usize; use std::{collections::VecDeque, io}; @@ -6,6 +5,7 @@ mod segment; pub mod utils; use segment::{Segment, SegmentPosition}; +use tracing::warn; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub enum Position { diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 51959d364..7cd504cf3 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -13,7 +13,7 @@ use crate::protocol::Protocol; use crate::server::tls::{self, TLSAcceptor}; use crate::ConnectionSettings; use flume::{RecvError, SendError, Sender}; -use log::*; +use tracing::{error, info}; use std::sync::Arc; #[cfg(feature = "websockets")] use websocket_codec::MessageCodec; From 39850c6fe448f78a383aedcf5895c5c32c9c747e Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Wed, 2 Nov 2022 13:16:17 +0530 Subject: [PATCH 02/26] tracing subscriber in progress --- Cargo.lock | 124 ++++++++++++++++---- rumqttd/Cargo.toml | 3 +- rumqttd/src/link/remote.rs | 4 +- rumqttd/src/main.rs | 73 ++++++------ rumqttd/src/protocol/v5/disconnect.rs | 3 +- rumqttd/src/router/iobufs.rs | 7 +- rumqttd/src/router/logs.rs | 2 +- rumqttd/src/router/routing.rs | 157 +++++++++++--------------- rumqttd/src/router/scheduler.rs | 13 ++- rumqttd/src/server/broker.rs | 31 ++--- 10 files changed, 236 insertions(+), 181 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4588c580..765efa162 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1209,6 +1209,15 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -1370,6 +1379,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-format" version = "0.4.0" @@ -1503,6 +1522,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1908,6 +1933,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.27" @@ -2025,7 +2059,6 @@ dependencies = [ "rustls-pemfile 0.3.0", "serde", "serde_json", - "simplelog", "slab", "structopt", "thiserror", @@ -2035,6 +2068,8 @@ dependencies = [ "tokio-tungstenite 0.15.0", "tokio-util", "tracing", + "tracing-subscriber", + "tracing-tree", "vergen", "websocket-codec", ] @@ -2324,23 +2359,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] -name = "signal-hook-registry" -version = "1.4.0" +name = "sharded-slab" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" dependencies = [ - "libc", + "lazy_static", ] [[package]] -name = "simplelog" -version = "0.12.0" +name = "signal-hook-registry" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" dependencies = [ - "log", - "termcolor", - "time", + "libc", ] [[package]] @@ -2536,6 +2569,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -2554,15 +2596,8 @@ dependencies = [ "itoa 1.0.3", "libc", "num_threads", - "time-macros", ] -[[package]] -name = "time-macros" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" - [[package]] name = "tiny_http" version = "0.8.2" @@ -2734,11 +2769,54 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ + "matchers", + "nu-ansi-term", "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "tracing-tree" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07e90b329c621ade432823988574e820212648aa40e7a2497777d58de0fb453" +dependencies = [ + "ansi_term 0.12.1", + "atty", + "tracing-core", + "tracing-log", + "tracing-subscriber", ] [[package]] @@ -2898,6 +2976,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 0faa7878b..24e941687 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -30,7 +30,8 @@ parking_lot = "0.11.2" config = "0.13" structopt = "0.3.26" tracing = { version="0.1", features=["log"] } -simplelog = "0.12" +tracing-subscriber = { version="0.3.16", features=["env-filter"] } +tracing-tree = "0" [features] default = ["use-rustls"] diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index 81430f3ee..0ead9a988 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -6,13 +6,13 @@ use crate::router::{Event, Notification}; use crate::{ConnectionId, ConnectionSettings, Link}; use flume::{RecvError, SendError, Sender, TrySendError}; -use tracing::debug; use std::collections::VecDeque; use std::io; use std::sync::Arc; use std::time::Duration; use tokio::time::error::Elapsed; use tokio::{select, time}; +use tracing::debug; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -129,7 +129,7 @@ impl RemoteLink

{ buffer.len() }; - debug!("{:15.15}[I] {:20} buffercount = {}", self.client_id, "packets", len); + debug!(client_id=self.client_id, info="packets", buffercount=len); self.link_tx.notify().await?; } // Receive from router when previous when state isn't in collision diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index c79f2edac..857926854 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -1,9 +1,8 @@ use rumqttd::Broker; -use simplelog::{ - Color, ColorChoice, CombinedLogger, Level, LevelFilter, LevelPadding, TargetPadding, - TermLogger, TerminalMode, ThreadPadding, -}; +use tracing::metadata::LevelFilter; +// use tracing_subscriber::{self, fmt, prelude::*, EnvFilter}; +use tracing_subscriber::{filter::filter_fn, fmt, prelude::*, EnvFilter, Registry}; use structopt::StructOpt; @@ -36,7 +35,39 @@ fn main() { let commandline: CommandLine = CommandLine::from_args(); banner(&commandline); - initialize_logging(&commandline); + let _level = match commandline.verbose { + 0 => LevelFilter::WARN, + 1 => LevelFilter::INFO, + 2 => LevelFilter::DEBUG, + _ => LevelFilter::TRACE, + }; + + /* FOR TREE VIEW + let layer = tracing_tree::HierarchicalLayer::default() + .with_writer(std::io::stdout) + .with_indent_lines(true) + .with_indent_amount(2) + .with_thread_names(false) + .with_thread_ids(false) + .with_verbose_exit(false) + .with_verbose_entry(false) + .with_targets(true); + + let subscriber = Registry::default().with(layer); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + */ + + + // let filter = filter_fn(|meta| meta.target().contains("rumqttd")); + + // with filter layers + tracing_subscriber::registry() + .with(fmt::layer().compact()) + // .with(filter) + .with(EnvFilter::from_default_env()) + .init(); + let config = config::Config::builder() .add_source(config::File::with_name(&commandline.config)) @@ -45,42 +76,12 @@ fn main() { let config = config.try_deserialize().unwrap(); - println!("{:#?}", config); + // println!("{:#?}", config); let mut broker = Broker::new(config); broker.start().unwrap(); } -fn initialize_logging(commandline: &CommandLine) { - let mut config = simplelog::ConfigBuilder::new(); - - let level = match commandline.verbose { - 0 => LevelFilter::Warn, - 1 => LevelFilter::Info, - 2 => LevelFilter::Debug, - _ => LevelFilter::Trace, - }; - - config - .set_location_level(LevelFilter::Off) - .set_target_level(LevelFilter::Error) - .set_target_padding(TargetPadding::Right(25)) - .set_thread_level(LevelFilter::Error) - .set_thread_padding(ThreadPadding::Right(2)) - .set_level_color(Level::Trace, Some(Color::Cyan)) - .set_level_padding(LevelPadding::Right); - - config.add_filter_allow_str("rumqttd"); - - let loggers = TermLogger::new( - level, - config.build(), - TerminalMode::Mixed, - ColorChoice::Always, - ); - CombinedLogger::init(vec![loggers]).unwrap(); -} - fn banner(commandline: &CommandLine) { const B: &str = r#" ██████╗ ██╗ ██╗███╗ ███╗ ██████╗ ████████╗████████╗██████╗ diff --git a/rumqttd/src/protocol/v5/disconnect.rs b/rumqttd/src/protocol/v5/disconnect.rs index bc13d07cd..27f37528d 100644 --- a/rumqttd/src/protocol/v5/disconnect.rs +++ b/rumqttd/src/protocol/v5/disconnect.rs @@ -75,9 +75,8 @@ pub fn write( } let len_len = write_remaining_length(buffer, length)?; - - buffer.put_u8(code(disconnect.reason_code)); + buffer.put_u8(code(disconnect.reason_code)); if let Some(properties) = &properties { properties::write(properties, buffer)?; diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 40342a78a..11ebe954d 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -146,10 +146,7 @@ impl Outgoing { let inflight_count = self.inflight_buffer.len(); if inflight_count > MAX_INFLIGHT { - error!( - "inflight_count = {:<2} MAX_INFLIGHT = {:<2}", - inflight_count, MAX_INFLIGHT - ); + error!(inflight_count, MAX_INFLIGHT); } (buffer_count, inflight_count) @@ -165,7 +162,7 @@ impl Outgoing { // We don't support out of order acks if pkid != head { - error!("out of order ack. pkid = {}, head = {}", pkid, head); + error!(error = "out of order ack.", pkid, head); return None; } diff --git a/rumqttd/src/router/logs.rs b/rumqttd/src/router/logs.rs index 8845a825d..0970d86dc 100644 --- a/rumqttd/src/router/logs.rs +++ b/rumqttd/src/router/logs.rs @@ -201,7 +201,7 @@ impl DataLog { filter: &str, notifications: &mut VecDeque<(ConnectionId, DataRequest)>, ) { - trace!("{:15.15}[S] for filter: {:?}", "retain-msg", &filter); + trace!(info = "retain-msg", filter = &filter); let idx = self.filter_indexes.get(filter).unwrap(); diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index f1d5b298b..a13f78273 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -10,12 +10,12 @@ use crate::segments::Position; use crate::*; use flume::{bounded, Receiver, RecvError, Sender, TryRecvError}; use slab::Slab; -use tracing::{error, info, debug, trace, warn}; use std::collections::{HashMap, HashSet, VecDeque}; use std::str::Utf8Error; use std::thread; use std::time::SystemTime; use thiserror::Error; +use tracing::{debug, error, info, trace, warn}; use super::graveyard::Graveyard; use super::iobufs::{Incoming, Outgoing}; @@ -134,13 +134,14 @@ impl Router { /// to communicate with router should only be returned only after it starts. /// For that reason, all the public methods should start the router in the /// background + #[tracing::instrument(skip(self))] pub fn spawn(mut self) -> Sender<(ConnectionId, Event)> { let router = thread::Builder::new().name(format!("router-{}", self.id)); let link = self.link(); router .spawn(move || { let e = self.run(0); - error!("Router done! Reason = {:?}", e); + error!(reason=?e, "Router done!"); }) .unwrap(); link @@ -149,6 +150,7 @@ impl Router { /// Waits on incoming events when ready queue is empty. /// After pulling 1 event, tries to pull 500 more events /// before polling ready queue 100 times (connections) + #[tracing::instrument(skip(self), name = "run router")] fn run(&mut self, count: usize) -> Result<(), RouterError> { match count { 0 => loop { @@ -196,6 +198,7 @@ impl Router { Ok(()) } + #[tracing::instrument(skip(self))] fn events(&mut self, id: ConnectionId, data: Event) { match data { Event::Connect { @@ -213,6 +216,7 @@ impl Router { } } + #[tracing::instrument(skip_all, fields(client_id))] fn handle_new_connection( &mut self, mut connection: Connection, @@ -222,10 +226,7 @@ impl Router { let client_id = outgoing.client_id.clone(); if self.connections.len() >= self.config.max_connections { - error!( - "{:15.15}[E] {:20}", - client_id, "no space for new connection" - ); + error!(client_id, "no space for new connection"); // let ack = ConnectionAck::Failure("No space for new connection".to_owned()); // let message = Notification::ConnectionAck(ack); return; @@ -265,10 +266,7 @@ impl Router { assert_eq!(self.obufs.insert(outgoing), connection_id); self.connection_map.insert(client_id.clone(), connection_id); - info!( - "{:15.15}[I] {:20} id = {}", - client_id, "connect", connection_id - ); + info!(client_id, info = "connect", connection_id,); assert_eq!(self.ackslog.insert(ackslog), connection_id); assert_eq!(self.scheduler.add(tracker), connection_id); @@ -288,24 +286,23 @@ impl Router { .reschedule(connection_id, ScheduleReason::Init); } + #[tracing::instrument(skip(self))] fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` let client_id = match &self.obufs.get(id) { Some(v) => v.client_id.clone(), None => { - error!( - "{:15.15}[E] {:20} id {} is already gone", - "", "no-connection", id - ); + error!("no-connection id {} is already gone", id); return; } }; + if execute_last_will { self.handle_last_will(id, client_id.clone()); } - info!("{:15.15}[I] {:20} id = {}", client_id, "disconnect", id); + info!(client_id, info = "disconnect", id); // Remove connection from router let mut connection = self.connections.remove(id); @@ -356,15 +353,13 @@ impl Router { } /// Handles new incoming data on a topic + #[tracing::instrument(skip(self))] fn handle_device_payload(&mut self, id: ConnectionId) { // TODO: Retun errors and move error handling to the caller let incoming = match self.ibufs.get_mut(id) { Some(v) => v, None => { - debug!( - "{:15.15}[E] {:20} id {} is already gone", - "", "no-connection", id - ); + error!("no-connection id {} is already gone", id); return; } }; @@ -384,10 +379,9 @@ impl Router { match packet { Packet::Publish(publish, _) => { trace!( - "{:15.15}[I] {:20} {:?}", client_id, - "publish", - publish.topic + packet="publish", + topic=?publish.topic ); let size = publish.len(); @@ -454,8 +448,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - "{:15.15}[E] {:20} error = {:?}", - client_id, "append-fail", e + client_id, error="append-fail", reason=?e ); self.router_metrics.failed_publishes += 1; disconnect = true; @@ -481,15 +474,12 @@ impl Router { // let len = s.len(); for f in s.filters { - info!( - "{:15.15}[I] {:20} filter = {}", - client_id, "subscribe", f.path - ); + info!(client_id, packet = "subscribe", filter = f.path); let connection = self.connections.get_mut(id).unwrap(); if let Err(e) = validate_subscription(/*connection,*/ &f) { let id = &self.ibufs[id].client_id; - error!("{:15.15}[E] {:20} error = {:?}", id, "bad-subscription", e); + error!(id, error="bad-subscription", reason=?e); disconnect = true; break; } @@ -524,8 +514,9 @@ impl Router { } Packet::Unsubscribe(unsubscribe, _) => { debug!( - "{:11} {:14} Id = {} Filters = {:?}", - "data", "unsubscribe", id, unsubscribe.filters + id, + packet="unsubscribe", + filters=?unsubscribe.filters ); let connection = self.connections.get_mut(id).unwrap(); let pkid = unsubscribe.pkid; @@ -545,14 +536,12 @@ impl Router { if connection.subscriptions.contains(&filter) { connection.subscriptions.remove(&filter); debug!( - "{:15.15}[I] {:20} filter = {}", - outgoing.client_id, "unsubscribe", filter + client_id = outgoing.client_id, + info = "unsubscribe", + filter ); } else { - error!( - "{:15.15}[E] {:20} pkid = {:?}", - id, "unsubscribe-failed", unsubscribe.pkid - ); + error!(id, error = "unsubscribe-failed", pkid = unsubscribe.pkid); continue; } let unsuback = UnsubAck { @@ -572,10 +561,7 @@ impl Router { let outgoing = self.obufs.get_mut(id).unwrap(); let pkid = puback.pkid; if outgoing.register_ack(pkid).is_none() { - error!( - "{:15.15}[E] {:20} pkid = {:?}", - id, "unsolicited/ooo ack", pkid - ); + error!(id, error = "unsolicited/ooo ack", pkid); disconnect = true; break; } @@ -586,10 +572,7 @@ impl Router { let outgoing = self.obufs.get_mut(id).unwrap(); let pkid = pubrec.pkid; if outgoing.register_ack(pkid).is_none() { - error!( - "{:15.15}[E] {:20} pkid = {:?}", - id, "unsolicited/ooo ack", pkid - ); + error!(id, error = "unsolicited/ooo ack", pkid); disconnect = true; break; } @@ -635,8 +618,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - "{:15.15}[E] {:20} error = {:?}", - client_id, "append-fail", e + client_id, error="append-fail", reason=?e ); self.router_metrics.failed_publishes += 1; disconnect = true; @@ -657,7 +639,7 @@ impl Router { break; } incoming => { - warn!("Packet = {:?} not supported by router yet", incoming); + warn!(packet=?incoming, "Packet not supported by router yet" ); } } } @@ -737,16 +719,14 @@ impl Router { /// send data and notifications to consumer. /// To activate a connection, first connection's tracker is fetched and /// all the requests are handled. + // #[tracing::instrument(skip(self))] fn consume(&mut self) -> Option<()> { let (id, mut requests) = self.scheduler.poll()?; let outgoing = match self.obufs.get_mut(id) { Some(v) => v, None => { - debug!( - "{:15.15}[E] {:20} id {} is already gone", - "", "no-connection", id - ); + error!("no-connection id {} is already gone", id); return Some(()); } }; @@ -754,16 +734,11 @@ impl Router { let ackslog = self.ackslog.get_mut(id).unwrap(); let datalog = &mut self.datalog; - trace!( - "{:15.15}[S] {:20} id = {}", - outgoing.client_id, - "consume", - id - ); + trace!(client_id = outgoing.client_id, info = "consume", id); // We always try to ack when ever a connection is scheduled if ack_device_data(ackslog, outgoing) { - trace!("{:15.15}[T] {:20}", outgoing.client_id, "acks-done"); + trace!(client_id = outgoing.client_id, "acks-done"); } // A new connection's tracker is always initialized with acks request. @@ -796,9 +771,9 @@ impl Router { ConsumeStatus::FilterCaughtup => { let filter = &request.filter; trace!( - "{:15.15}[S] {:20} f = {filter}", - outgoing.client_id, - "caughtup-park" + client_id = outgoing.client_id, + info = "caughtup-park", + filter ); // When all the data in the log is caught up, current request is @@ -849,8 +824,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - "{:15.15}[E] {:20} error = {:?}", - client_id, "append-fail", e + client_id, error="append-fail", reason=?e ); self.router_metrics.failed_publishes += 1; // Removed disconnect = true from here because we disconnect anyways @@ -859,6 +833,7 @@ impl Router { } } +#[tracing::instrument(skip_all)] fn append_to_commitlog( id: ConnectionId, mut publish: Publish, @@ -906,14 +881,14 @@ fn append_to_commitlog( let datalog = datalog.native.get_mut(filter_idx).unwrap(); let (offset, filter) = datalog.append(publish.clone(), notifications); debug!( - "{:15.15}[I] {:20} append = {}[{}, {}), pkid = {}", // map client id from connection id - connections[id].client_id, - "publish", + client_id = connections[id].client_id, + info = "publish", + pkid, + "append = {}[{}, {})", filter, offset.0, offset.1, - pkid ); o = offset; @@ -926,6 +901,7 @@ fn append_to_commitlog( /// Sweep ackslog for all the pending acks. /// We write everything to outgoing buf with out worrying about buffer size /// because acks most certainly won't cause memory bloat +#[tracing::instrument(skip_all)] fn ack_device_data(ackslog: &mut AckLog, outgoing: &mut Outgoing) -> bool { let acks = ackslog.readv(); if acks.is_empty() { @@ -939,21 +915,13 @@ fn ack_device_data(ackslog: &mut AckLog, outgoing: &mut Outgoing) -> bool { // At any given point of time, there can be a max of connection's buffer size for ack in acks.drain(..) { let pkid = packetid(&ack); - trace!( - "{:15.15}[O] {:20} pkid = {:?}", - outgoing.client_id, - "ack", - pkid - ); + trace!(client_id = outgoing.client_id, packet = "ack", pkid); let message = Notification::DeviceAck(ack); buffer.push_back(message); count += 1; } - debug!( - "{:15.15}[O] {:20} count = {:?}", - outgoing.client_id, "acks", count - ); + debug!(client_id = outgoing.client_id, acks_count = count); outgoing.handle.try_send(()).ok(); true } @@ -971,15 +939,16 @@ enum ConsumeStatus { /// 1. `busy`: whether the data request was completed or not. /// 2. `done`: whether the connection was busy or not. /// 3. `inflight_full`: whether the inflight requests were completely filled +#[tracing::instrument(skip_all)] fn forward_device_data( request: &mut DataRequest, datalog: &DataLog, outgoing: &mut Outgoing, ) -> ConsumeStatus { trace!( - "{:15.15}[T] {:20} cursor = {}[{}, {}]", - outgoing.client_id, - "data-request", + client_id = outgoing.client_id, + info = "data-request", + "cursor = {}[{}, {}]", request.filter, request.cursor.0, request.cursor.1 @@ -1000,7 +969,7 @@ fn forward_device_data( match datalog.native_readv(request.filter_idx, request.cursor, inflight_slots) { Ok(v) => v, Err(e) => { - error!("Failed to read from commitlog. Error = {:?}", e); + error!(error=?e, "Failed to read from commitlog."); return ConsumeStatus::FilterCaughtup; } }; @@ -1012,15 +981,16 @@ fn forward_device_data( if start != request.cursor { error!( - "Read cursor jump. Cursor = {:?}, Start = {:?}", - request.cursor, start + cursor=?request.cursor, + ?start, + "Read cursor jump.", ); } trace!( - "{:15.15}[T] {:20} cursor = {}[{}, {})", - outgoing.client_id, - "data-response", + client_id = outgoing.client_id, + info = "data-response", + "cursor = {}[{}, {})", request.filter, next.0, next.1, @@ -1038,9 +1008,9 @@ fn forward_device_data( // Fill and notify device data debug!( - "{:15.15}[O] {:20} cursor = {}[{}, {}) count = {}", - outgoing.client_id, - "data-proxy", + client_id = outgoing.client_id, + info = "data-proxy", + "cursor = {}[{}, {}) count = {}", request.filter, request.cursor.0, request.cursor.1, @@ -1059,10 +1029,9 @@ fn forward_device_data( let (len, inflight) = outgoing.push_forwards(forwards, qos, filter_idx); trace!( - "{:15.15}[O] {:20} buffer = {}, inflight = {}", - outgoing.client_id, - "inflight", - len, + client_id = outgoing.client_id, + info = "inflight", + buffer = len, inflight ); diff --git a/rumqttd/src/router/scheduler.rs b/rumqttd/src/router/scheduler.rs index 0b571d644..9f56b0df7 100644 --- a/rumqttd/src/router/scheduler.rs +++ b/rumqttd/src/router/scheduler.rs @@ -34,6 +34,7 @@ impl Scheduler { } /// Next connection which is ready to make progress + // #[tracing::instrument(skip_all)] pub fn poll(&mut self) -> Option<(ConnectionId, VecDeque)> { let id = self.readyqueue.pop_front()?; let tracker = self.trackers.get_mut(id)?; @@ -64,9 +65,9 @@ impl Scheduler { let tracker = self.trackers.get_mut(id).unwrap(); if let Some(v) = tracker.try_ready(reason) { trace!( - "{:15.15}[S] {:20} {:?} -> Ready", - tracker.id, - "reschedule", + tracker_id = tracker.id, + info = "reschedule", + "{:?} -> Ready", v ); self.readyqueue.push_back(id); @@ -78,9 +79,9 @@ impl Scheduler { let tracker = self.trackers.get_mut(id).unwrap(); trace!( - "{:15.15}[S] {:20} {:?} -> {:?}", - tracker.id, - "pause", + tracker_id = tracker.id, + info = "pause", + "{:?} -> {:?}", tracker.status, reason ); diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 7cd504cf3..839cfe5b9 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -13,8 +13,8 @@ use crate::protocol::Protocol; use crate::server::tls::{self, TLSAcceptor}; use crate::ConnectionSettings; use flume::{RecvError, SendError, Sender}; -use tracing::{error, info}; use std::sync::Arc; +use tracing::{error, info}; #[cfg(feature = "websockets")] use websocket_codec::MessageCodec; @@ -133,6 +133,7 @@ impl Broker { Ok((link_tx, link_rx)) } + #[tracing::instrument(skip(self))] pub fn start(&mut self) -> Result<(), Error> { // spawn bridge in a separate thread // if let Some(bridge_config) = self.config.bridge.clone() { @@ -162,7 +163,7 @@ impl Broker { runtime.block_on(async { if let Err(e) = server.start(false).await { - error!("{:15.15}[I] Remote link error = {:?}", "", e); + error!(error=?e, "Remote link error"); } }); })?; @@ -177,7 +178,7 @@ impl Broker { runtime.block_on(async { if let Err(e) = server.start(false).await { - error!("{:15.15}[I] Remote link error = {:?}", "", e); + error!(error=?e, "Remote link error"); } }); })?; @@ -199,7 +200,7 @@ impl Broker { runtime.block_on(async { if let Err(e) = server.start(true).await { - error!("{:15.15}[I] Remote link error = {:?}", "", e); + error!(error=?e, "Remote link error"); } }); })?; @@ -265,6 +266,7 @@ impl Server

{ Ok(/*(*/ Box::new(stream) /*, None)*/) } + #[tracing::instrument(skip(self))] async fn start(&self, shadow: bool) -> Result<(), Error> { let listener = TcpListener::bind(&self.config.listen).await?; let delay = Duration::from_millis(self.config.next_connection_delay_ms); @@ -272,15 +274,16 @@ impl Server

{ let config = Arc::new(self.config.connections.clone()); info!( - "{:15.15}[>] waiting for remote connections > {}", - self.config.name, self.config.listen + config=self.config.name, + "[>] waiting for remote connections > {}", + self.config.listen ); loop { // Await new network connection. let (stream, addr) = match listener.accept().await { Ok((s, r)) => (s, r), Err(e) => { - error!("Unable to accept socket. Error = {:?}", e); + error!(error=?e, "Unable to accept socket."); continue; } }; @@ -288,14 +291,13 @@ impl Server

{ let /*(*/network /*, tenant_id)*/ = match self.tls_accept(stream).await { Ok(o) => o, Err(e) => { - error!("Tls accept error = {:?}", e); + error!(error=?e, "Tls accept error"); continue; } }; info!( - "{:15.15}[I] {:20} addr = {} count {}", - self.config.name, "accept", addr, count + name=?self.config.name, info="accept", ?addr, count ); let config = config.clone(); @@ -321,6 +323,7 @@ impl Server

{ /// waiting for mqtt connect packet. Also this honours connection wait time as per config to prevent /// denial of service attacks (rogue clients which only does network connection without sending /// mqtt connection packet to make make the server reach its concurrent connection limit) +#[tracing::instrument(skip_all)] async fn remote( config: Arc, // tenant_id: Option, @@ -333,7 +336,7 @@ async fn remote( let mut link = match RemoteLink::new(config, router_tx.clone(), /*tenant_id,*/ network).await { Ok(l) => l, Err(e) => { - error!("{:15.15}[E] Remote link error = {:?}", "", e); + error!(error=?e, "Remote link error"); return; } }; @@ -344,16 +347,16 @@ async fn remote( match link.start().await { // Connection get close. This shouldn't usually happen - Ok(_) => error!("{:15.15}[E] connection-stop", client_id), + Ok(_) => error!(client_id, "connection-stop"), // No need to send a disconnect message when disconnetion // originated internally in the router Err(remote::Error::Link(e)) => { - info!("{:15.15}[E] {:20} {:?}", client_id, "router-drop", e); + error!(client_id, error=?e, "router-drop"); return; } // Any other error Err(e) => { - error!("{:15.15}[E] Disconnected!! {:?}", client_id, e); + error!(client_id, error=?e,"Disconnected!!"); execute_will = true; } }; From 858d9602ca8782611a9a44b65ac3df7ce2ed6788 Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:14:54 +0530 Subject: [PATCH 03/26] added client_idxx as span field and use FILTER env var --- rumqttd/src/main.rs | 24 ++++++++++++------------ rumqttd/src/router/routing.rs | 6 ++++-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index 857926854..fdb43d4c7 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -1,9 +1,6 @@ use rumqttd::Broker; use tracing::metadata::LevelFilter; -// use tracing_subscriber::{self, fmt, prelude::*, EnvFilter}; -use tracing_subscriber::{filter::filter_fn, fmt, prelude::*, EnvFilter, Registry}; - use structopt::StructOpt; #[derive(StructOpt)] @@ -42,7 +39,7 @@ fn main() { _ => LevelFilter::TRACE, }; - /* FOR TREE VIEW + /* FOR TREE VIEW let layer = tracing_tree::HierarchicalLayer::default() .with_writer(std::io::stdout) .with_indent_lines(true) @@ -58,17 +55,20 @@ fn main() { */ + let env_filter = tracing_subscriber::EnvFilter::builder() + .with_regex(false) + .with_env_var("FILTER") + .from_env() + .unwrap(); - // let filter = filter_fn(|meta| meta.target().contains("rumqttd")); - - // with filter layers - tracing_subscriber::registry() - .with(fmt::layer().compact()) - // .with(filter) - .with(EnvFilter::from_default_env()) + tracing_subscriber::fmt() + .with_env_filter(env_filter) + // .with_file(false) + // .with_thread_ids(false) + // .with_thread_names(false) + // .pretty() .init(); - let config = config::Config::builder() .add_source(config::File::with_name(&commandline.config)) .build() diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index a13f78273..3bf89805d 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -216,7 +216,7 @@ impl Router { } } - #[tracing::instrument(skip_all, fields(client_id))] + #[tracing::instrument(skip_all, fields(client_idxx))] fn handle_new_connection( &mut self, mut connection: Connection, @@ -225,6 +225,8 @@ impl Router { ) { let client_id = outgoing.client_id.clone(); + tracing::Span::current().record("client_idxx", &client_id); + if self.connections.len() >= self.config.max_connections { error!(client_id, "no space for new connection"); // let ack = ConnectionAck::Failure("No space for new connection".to_owned()); @@ -297,7 +299,7 @@ impl Router { return; } }; - + if execute_last_will { self.handle_last_will(id, client_id.clone()); } From 93c9d6ff43694169c82f759e791e562e4a3dc538 Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Thu, 3 Nov 2022 13:02:06 +0530 Subject: [PATCH 04/26] manually creat spans --- rumqttd/src/main.rs | 21 +++++++++++++-------- rumqttd/src/router/routing.rs | 14 ++++++++++---- rumqttd/src/server/broker.rs | 5 ++--- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index fdb43d4c7..b75a69dd9 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -1,7 +1,8 @@ use rumqttd::Broker; -use tracing::metadata::LevelFilter; use structopt::StructOpt; +use tracing::metadata::LevelFilter; +use tracing_subscriber::prelude::*; #[derive(StructOpt)] #[structopt(name = "rumqttd")] @@ -56,17 +57,21 @@ fn main() { */ let env_filter = tracing_subscriber::EnvFilter::builder() - .with_regex(false) + .with_regex(false) // exactly match fmt::Debug output .with_env_var("FILTER") .from_env() .unwrap(); - tracing_subscriber::fmt() - .with_env_filter(env_filter) - // .with_file(false) - // .with_thread_ids(false) - // .with_thread_names(false) - // .pretty() + let fmt_layer = tracing_subscriber::fmt::layer() + .with_line_number(false) + .with_file(false) + .with_thread_ids(false) + .with_thread_names(false) + .compact(); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) .init(); let config = config::Config::builder() diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 3bf89805d..edc491fdb 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -216,7 +216,7 @@ impl Router { } } - #[tracing::instrument(skip_all, fields(client_idxx))] + // #[tracing::instrument(skip_all, fields(client_idxx))] fn handle_new_connection( &mut self, mut connection: Connection, @@ -225,7 +225,9 @@ impl Router { ) { let client_id = outgoing.client_id.clone(); - tracing::Span::current().record("client_idxx", &client_id); + let span = tracing::info_span!("handle_new_conn", client_id); + + let _guard = span.enter(); if self.connections.len() >= self.config.max_connections { error!(client_id, "no space for new connection"); @@ -288,7 +290,7 @@ impl Router { .reschedule(connection_id, ScheduleReason::Init); } - #[tracing::instrument(skip(self))] + // #[tracing::instrument(skip(self))] fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` @@ -300,6 +302,8 @@ impl Router { } }; + let span = tracing::info_span!("handle_disconnection", client_id); + let _guard = span.enter(); if execute_last_will { self.handle_last_will(id, client_id.clone()); } @@ -355,7 +359,7 @@ impl Router { } /// Handles new incoming data on a topic - #[tracing::instrument(skip(self))] + // #[tracing::instrument(skip(self))] fn handle_device_payload(&mut self, id: ConnectionId) { // TODO: Retun errors and move error handling to the caller let incoming = match self.ibufs.get_mut(id) { @@ -367,6 +371,8 @@ impl Router { }; let client_id = incoming.client_id.clone(); + let span = tracing::info_span!("handle_payload", client_id); + let _guard = span.enter(); // Instead of exchanging, we should just append new incoming packets inside cache let mut packets = incoming.exchange(self.cache.take().unwrap()); diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 839cfe5b9..ed6823ea4 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -274,9 +274,8 @@ impl Server

{ let config = Arc::new(self.config.connections.clone()); info!( - config=self.config.name, - "[>] waiting for remote connections > {}", - self.config.listen + config = self.config.name, + "[>] waiting for remote connections > {}", self.config.listen ); loop { // Await new network connection. From f01dd4eb0b836b991828664301a3cf3bb5b3755f Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Fri, 4 Nov 2022 22:25:47 +0530 Subject: [PATCH 05/26] initial poc for reloading filter --- rumqttd/src/lib.rs | 6 +++--- rumqttd/src/link/console.rs | 12 +++++++++++ rumqttd/src/main.rs | 43 +++++++++++++++++++++---------------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 336793cc8..b961c8fce 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -1,11 +1,9 @@ -#[macro_use] -extern crate rouille; - use std::collections::HashMap; use std::path::PathBuf; use segments::Storage; use serde::{Deserialize, Serialize}; +use tracing_subscriber::{filter::EnvFilter, reload::Handle, fmt::Formatter}; use std::net::SocketAddr; @@ -99,6 +97,8 @@ pub struct RouterConfig { #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ConsoleSettings { pub listen: String, + #[serde(skip)] + pub filter_handle: Option>, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index 40e345f4e..bae7b7d9b 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -2,6 +2,7 @@ use crate::link::local::{Link, LinkRx}; use crate::router::{Event, MetricsRequest}; use crate::{ConnectionId, ConsoleSettings}; use flume::Sender; +use rouille::{router, try_or_400}; use std::sync::Arc; pub struct ConsoleLink { @@ -99,6 +100,17 @@ pub fn start(console: Arc) { let v = console.link_rx.metrics(); rouille::Response::json(&v) + }, + (POST) (/logs) => { + let data = try_or_400!(rouille::input::plain_text_body(&request)); + if let Some(handle) = &console.config.filter_handle { + if handle.reload(data.clone()).is_err() { + return rouille::Response::empty_400(); + } + return rouille::Response::json(&data); + } + + rouille::Response::empty_404() }, _ => rouille::Response::empty_404() ) diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index b75a69dd9..3d0c1dc3c 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -1,8 +1,7 @@ -use rumqttd::Broker; +use rumqttd::{Broker, Config}; use structopt::StructOpt; use tracing::metadata::LevelFilter; -use tracing_subscriber::prelude::*; #[derive(StructOpt)] #[structopt(name = "rumqttd")] @@ -56,30 +55,38 @@ fn main() { */ - let env_filter = tracing_subscriber::EnvFilter::builder() - .with_regex(false) // exactly match fmt::Debug output - .with_env_var("FILTER") - .from_env() - .unwrap(); + // let env_filter = tracing_subscriber::EnvFilter::builder() + // .with_regex(false) // exactly match fmt::Debug output + // .with_env_var("FILTER") + // .from_env() + // .unwrap(); - let fmt_layer = tracing_subscriber::fmt::layer() - .with_line_number(false) - .with_file(false) - .with_thread_ids(false) - .with_thread_names(false) - .compact(); + // let fmt_layer = tracing_subscriber::fmt::layer() + // .with_line_number(false) + // .with_file(false) + // .with_thread_ids(false) + // .with_thread_names(false) + // .compact(); + + let builder = tracing_subscriber::fmt().with_env_filter("info").with_filter_reloading(); - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .init(); + let handle = builder.reload_handle(); + + builder.try_init().expect("initialized subscriber succesfully"); + + // tracing_subscriber::registry() + // .with(env_filter) + // .with(fmt_layer) + // .init(); let config = config::Config::builder() .add_source(config::File::with_name(&commandline.config)) .build() .unwrap(); - let config = config.try_deserialize().unwrap(); + let mut config: Config = config.try_deserialize().unwrap(); + + config.console.filter_handle = Some(handle); // println!("{:#?}", config); From d4460b5fac6ce8189edbe9ac35cb14ffa293d8ff Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Sun, 6 Nov 2022 18:48:15 +0530 Subject: [PATCH 06/26] feat: dynamically reloading filter for logs --- Cargo.lock | 14 --------- rumqttd/Cargo.toml | 1 - rumqttd/src/lib.rs | 10 +++++-- rumqttd/src/link/console.rs | 2 +- rumqttd/src/main.rs | 59 ++++++++++--------------------------- 5 files changed, 25 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 765efa162..dad18dc37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2069,7 +2069,6 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "tracing-tree", "vergen", "websocket-codec", ] @@ -2806,19 +2805,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "tracing-tree" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07e90b329c621ade432823988574e820212648aa40e7a2497777d58de0fb453" -dependencies = [ - "ansi_term 0.12.1", - "atty", - "tracing-core", - "tracing-log", - "tracing-subscriber", -] - [[package]] name = "try-lock" version = "0.2.3" diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 24e941687..205f442a5 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -31,7 +31,6 @@ config = "0.13" structopt = "0.3.26" tracing = { version="0.1", features=["log"] } tracing-subscriber = { version="0.3.16", features=["env-filter"] } -tracing-tree = "0" [features] default = ["use-rustls"] diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index b961c8fce..a9b4bd47f 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use segments::Storage; use serde::{Deserialize, Serialize}; -use tracing_subscriber::{filter::EnvFilter, reload::Handle, fmt::Formatter}; +use tracing_subscriber::{filter::EnvFilter, fmt::Formatter, reload::Handle}; use std::net::SocketAddr; @@ -98,7 +98,13 @@ pub struct RouterConfig { pub struct ConsoleSettings { pub listen: String, #[serde(skip)] - pub filter_handle: Option>, + filter_handle: Option>, +} + +impl ConsoleSettings { + pub fn set_filter_reload_handle(&mut self, handle: Handle) { + self.filter_handle.replace(handle); + } } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index bae7b7d9b..565803cae 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -109,7 +109,7 @@ pub fn start(console: Arc) { } return rouille::Response::json(&data); } - + rouille::Response::empty_404() }, _ => rouille::Response::empty_404() diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index 3d0c1dc3c..c2f42b074 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -1,7 +1,6 @@ use rumqttd::{Broker, Config}; use structopt::StructOpt; -use tracing::metadata::LevelFilter; #[derive(StructOpt)] #[structopt(name = "rumqttd")] @@ -32,52 +31,26 @@ fn main() { let commandline: CommandLine = CommandLine::from_args(); banner(&commandline); - let _level = match commandline.verbose { - 0 => LevelFilter::WARN, - 1 => LevelFilter::INFO, - 2 => LevelFilter::DEBUG, - _ => LevelFilter::TRACE, + let level = match commandline.verbose { + 0 => "warn", + 1 => "info", + 2 => "debug", + _ => "trace", }; - /* FOR TREE VIEW - let layer = tracing_tree::HierarchicalLayer::default() - .with_writer(std::io::stdout) - .with_indent_lines(true) - .with_indent_amount(2) - .with_thread_names(false) + let builder = tracing_subscriber::fmt() + .with_line_number(false) + .with_file(false) .with_thread_ids(false) - .with_verbose_exit(false) - .with_verbose_entry(false) - .with_targets(true); - - let subscriber = Registry::default().with(layer); - tracing::subscriber::set_global_default(subscriber).unwrap(); - - */ - - // let env_filter = tracing_subscriber::EnvFilter::builder() - // .with_regex(false) // exactly match fmt::Debug output - // .with_env_var("FILTER") - // .from_env() - // .unwrap(); - - // let fmt_layer = tracing_subscriber::fmt::layer() - // .with_line_number(false) - // .with_file(false) - // .with_thread_ids(false) - // .with_thread_names(false) - // .compact(); - - let builder = tracing_subscriber::fmt().with_env_filter("info").with_filter_reloading(); - - let handle = builder.reload_handle(); + .with_thread_names(false) + .with_env_filter(level) + .with_filter_reloading(); - builder.try_init().expect("initialized subscriber succesfully"); + let reload_handle = builder.reload_handle(); - // tracing_subscriber::registry() - // .with(env_filter) - // .with(fmt_layer) - // .init(); + builder + .try_init() + .expect("initialized subscriber succesfully"); let config = config::Config::builder() .add_source(config::File::with_name(&commandline.config)) @@ -86,7 +59,7 @@ fn main() { let mut config: Config = config.try_deserialize().unwrap(); - config.console.filter_handle = Some(handle); + config.console.set_filter_reload_handle(reload_handle); // println!("{:#?}", config); From 02235a478b4f2772002d31cf3cba13f789a3171c Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Mon, 7 Nov 2022 18:21:05 +0530 Subject: [PATCH 07/26] feat: filter the log with rumqttd as target by default --- rumqttd/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index c2f42b074..e94987b3c 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -32,10 +32,10 @@ fn main() { banner(&commandline); let level = match commandline.verbose { - 0 => "warn", - 1 => "info", - 2 => "debug", - _ => "trace", + 0 => "rumqttd=warn", + 1 => "rumqttd=info", + 2 => "rumqttd=debug", + _ => "rumqttd=trace", }; let builder = tracing_subscriber::fmt() From 8c65f0a625f4d4d594a82889c4c52071d83af95e Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Mon, 7 Nov 2022 23:50:39 +0530 Subject: [PATCH 08/26] feat: consume span and removed unneccesary fields --- rumqttd/src/router/routing.rs | 99 ++++++++++++--------------------- rumqttd/src/router/scheduler.rs | 1 - 2 files changed, 37 insertions(+), 63 deletions(-) diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index edc491fdb..59da7c86a 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -134,7 +134,7 @@ impl Router { /// to communicate with router should only be returned only after it starts. /// For that reason, all the public methods should start the router in the /// background - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip_all)] pub fn spawn(mut self) -> Sender<(ConnectionId, Event)> { let router = thread::Builder::new().name(format!("router-{}", self.id)); let link = self.link(); @@ -150,8 +150,9 @@ impl Router { /// Waits on incoming events when ready queue is empty. /// After pulling 1 event, tries to pull 500 more events /// before polling ready queue 100 times (connections) - #[tracing::instrument(skip(self), name = "run router")] fn run(&mut self, count: usize) -> Result<(), RouterError> { + let span = tracing::info_span!("run", count); + let _guard = span.enter(); match count { 0 => loop { self.run_inner()?; @@ -198,8 +199,9 @@ impl Router { Ok(()) } - #[tracing::instrument(skip(self))] fn events(&mut self, id: ConnectionId, data: Event) { + let span = tracing::info_span!("event", id); + let _guard = span.enter(); match data { Event::Connect { connection, @@ -216,7 +218,6 @@ impl Router { } } - // #[tracing::instrument(skip_all, fields(client_idxx))] fn handle_new_connection( &mut self, mut connection: Connection, @@ -226,11 +227,10 @@ impl Router { let client_id = outgoing.client_id.clone(); let span = tracing::info_span!("handle_new_conn", client_id); - let _guard = span.enter(); if self.connections.len() >= self.config.max_connections { - error!(client_id, "no space for new connection"); + error!("no space for new connection"); // let ack = ConnectionAck::Failure("No space for new connection".to_owned()); // let message = Notification::ConnectionAck(ack); return; @@ -270,7 +270,7 @@ impl Router { assert_eq!(self.obufs.insert(outgoing), connection_id); self.connection_map.insert(client_id.clone(), connection_id); - info!(client_id, info = "connect", connection_id,); + info!(connection_id, "connect"); assert_eq!(self.ackslog.insert(ackslog), connection_id); assert_eq!(self.scheduler.add(tracker), connection_id); @@ -290,7 +290,6 @@ impl Router { .reschedule(connection_id, ScheduleReason::Init); } - // #[tracing::instrument(skip(self))] fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` @@ -305,10 +304,10 @@ impl Router { let span = tracing::info_span!("handle_disconnection", client_id); let _guard = span.enter(); if execute_last_will { - self.handle_last_will(id, client_id.clone()); + self.handle_last_will(id); } - info!(client_id, info = "disconnect", id); + info!(id, "disconnect"); // Remove connection from router let mut connection = self.connections.remove(id); @@ -359,7 +358,6 @@ impl Router { } /// Handles new incoming data on a topic - // #[tracing::instrument(skip(self))] fn handle_device_payload(&mut self, id: ConnectionId) { // TODO: Retun errors and move error handling to the caller let incoming = match self.ibufs.get_mut(id) { @@ -371,7 +369,7 @@ impl Router { }; let client_id = incoming.client_id.clone(); - let span = tracing::info_span!("handle_payload", client_id); + let span = tracing::info_span!("handle_payload", client_id, connection_id = id); let _guard = span.enter(); // Instead of exchanging, we should just append new incoming packets inside cache let mut packets = incoming.exchange(self.cache.take().unwrap()); @@ -387,9 +385,8 @@ impl Router { match packet { Packet::Publish(publish, _) => { trace!( - client_id, - packet="publish", - topic=?publish.topic + topic=?publish.topic, + "publish" ); let size = publish.len(); @@ -456,7 +453,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - client_id, error="append-fail", reason=?e + reason=?e, "append-fail" ); self.router_metrics.failed_publishes += 1; disconnect = true; @@ -482,12 +479,12 @@ impl Router { // let len = s.len(); for f in s.filters { - info!(client_id, packet = "subscribe", filter = f.path); + info!(filter = f.path, "subscribe"); let connection = self.connections.get_mut(id).unwrap(); if let Err(e) = validate_subscription(/*connection,*/ &f) { let id = &self.ibufs[id].client_id; - error!(id, error="bad-subscription", reason=?e); + error!(id, reason=?e,"bad-subscription" ); disconnect = true; break; } @@ -522,9 +519,8 @@ impl Router { } Packet::Unsubscribe(unsubscribe, _) => { debug!( - id, - packet="unsubscribe", - filters=?unsubscribe.filters + filters=?unsubscribe.filters, + "unsubscribe", ); let connection = self.connections.get_mut(id).unwrap(); let pkid = unsubscribe.pkid; @@ -544,12 +540,11 @@ impl Router { if connection.subscriptions.contains(&filter) { connection.subscriptions.remove(&filter); debug!( - client_id = outgoing.client_id, - info = "unsubscribe", - filter + outgoing_client_id = outgoing.client_id, + filter, "unsubscribe" ); } else { - error!(id, error = "unsubscribe-failed", pkid = unsubscribe.pkid); + error!(pkid = unsubscribe.pkid, "unsubscribe-failed"); continue; } let unsuback = UnsubAck { @@ -569,7 +564,7 @@ impl Router { let outgoing = self.obufs.get_mut(id).unwrap(); let pkid = puback.pkid; if outgoing.register_ack(pkid).is_none() { - error!(id, error = "unsolicited/ooo ack", pkid); + error!(pkid, "unsolicited/ooo ack"); disconnect = true; break; } @@ -580,7 +575,7 @@ impl Router { let outgoing = self.obufs.get_mut(id).unwrap(); let pkid = pubrec.pkid; if outgoing.register_ack(pkid).is_none() { - error!(id, error = "unsolicited/ooo ack", pkid); + error!(pkid, "unsolicited/ooo ack"); disconnect = true; break; } @@ -626,7 +621,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - client_id, error="append-fail", reason=?e + reason=?e,"append-fail" ); self.router_metrics.failed_publishes += 1; disconnect = true; @@ -727,7 +722,6 @@ impl Router { /// send data and notifications to consumer. /// To activate a connection, first connection's tracker is fetched and /// all the requests are handled. - // #[tracing::instrument(skip(self))] fn consume(&mut self) -> Option<()> { let (id, mut requests) = self.scheduler.poll()?; @@ -739,14 +733,16 @@ impl Router { } }; + let span = tracing::info_span!("consume", client_id = outgoing.client_id); + let _guard = span.enter(); let ackslog = self.ackslog.get_mut(id).unwrap(); let datalog = &mut self.datalog; - trace!(client_id = outgoing.client_id, info = "consume", id); + trace!(id, "consume"); // We always try to ack when ever a connection is scheduled if ack_device_data(ackslog, outgoing) { - trace!(client_id = outgoing.client_id, "acks-done"); + trace!("acks-done"); } // A new connection's tracker is always initialized with acks request. @@ -778,11 +774,7 @@ impl Router { } ConsumeStatus::FilterCaughtup => { let filter = &request.filter; - trace!( - client_id = outgoing.client_id, - info = "caughtup-park", - filter - ); + trace!(filter, "caughtup-park"); // When all the data in the log is caught up, current request is // registered in waiters and not added back to the tracker. This @@ -800,7 +792,7 @@ impl Router { Some(()) } - pub fn handle_last_will(&mut self, id: ConnectionId, client_id: String) { + pub fn handle_last_will(&mut self, id: ConnectionId) { let connection = self.connections.get_mut(id).unwrap(); let will = match connection.last_will.take() { Some(v) => v, @@ -832,7 +824,7 @@ impl Router { Err(e) => { // Disconnect on bad publishes error!( - client_id, error="append-fail", reason=?e + reason=?e,"append-fail" ); self.router_metrics.failed_publishes += 1; // Removed disconnect = true from here because we disconnect anyways @@ -888,16 +880,7 @@ fn append_to_commitlog( for filter_idx in filter_idxs { let datalog = datalog.native.get_mut(filter_idx).unwrap(); let (offset, filter) = datalog.append(publish.clone(), notifications); - debug!( - // map client id from connection id - client_id = connections[id].client_id, - info = "publish", - pkid, - "append = {}[{}, {})", - filter, - offset.0, - offset.1, - ); + debug!(pkid, "append = {}[{}, {})", filter, offset.0, offset.1,); o = offset; } @@ -923,13 +906,13 @@ fn ack_device_data(ackslog: &mut AckLog, outgoing: &mut Outgoing) -> bool { // At any given point of time, there can be a max of connection's buffer size for ack in acks.drain(..) { let pkid = packetid(&ack); - trace!(client_id = outgoing.client_id, packet = "ack", pkid); + trace!(pkid, "ack"); let message = Notification::DeviceAck(ack); buffer.push_back(message); count += 1; } - debug!(client_id = outgoing.client_id, acks_count = count); + debug!(acks_count = count); outgoing.handle.try_send(()).ok(); true } @@ -954,8 +937,7 @@ fn forward_device_data( outgoing: &mut Outgoing, ) -> ConsumeStatus { trace!( - client_id = outgoing.client_id, - info = "data-request", + message = "data-request", "cursor = {}[{}, {}]", request.filter, request.cursor.0, @@ -996,8 +978,7 @@ fn forward_device_data( } trace!( - client_id = outgoing.client_id, - info = "data-response", + message = "data-response", "cursor = {}[{}, {})", request.filter, next.0, @@ -1016,8 +997,7 @@ fn forward_device_data( // Fill and notify device data debug!( - client_id = outgoing.client_id, - info = "data-proxy", + message = "data-proxy", "cursor = {}[{}, {}) count = {}", request.filter, request.cursor.0, @@ -1036,12 +1016,7 @@ fn forward_device_data( let (len, inflight) = outgoing.push_forwards(forwards, qos, filter_idx); - trace!( - client_id = outgoing.client_id, - info = "inflight", - buffer = len, - inflight - ); + trace!(inflight, buffer = len, "inflight"); if len >= MAX_CHANNEL_CAPACITY - 1 { outgoing.push_notification(Notification::Unschedule); diff --git a/rumqttd/src/router/scheduler.rs b/rumqttd/src/router/scheduler.rs index 9f56b0df7..7820271a8 100644 --- a/rumqttd/src/router/scheduler.rs +++ b/rumqttd/src/router/scheduler.rs @@ -34,7 +34,6 @@ impl Scheduler { } /// Next connection which is ready to make progress - // #[tracing::instrument(skip_all)] pub fn poll(&mut self) -> Option<(ConnectionId, VecDeque)> { let id = self.readyqueue.pop_front()?; let tracker = self.trackers.get_mut(id)?; From c0061cc7ecbf1e2d69af2ff84562113183d5c88a Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 8 Nov 2022 00:02:48 +0530 Subject: [PATCH 09/26] return text instead of json if logs filter reloaded --- rumqttd/src/link/console.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index 565803cae..41404a785 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -102,14 +102,13 @@ pub fn start(console: Arc) { rouille::Response::json(&v) }, (POST) (/logs) => { - let data = try_or_400!(rouille::input::plain_text_body(&request)); + let data = try_or_400!(rouille::input::plain_text_body(request)); if let Some(handle) = &console.config.filter_handle { - if handle.reload(data.clone()).is_err() { + if handle.reload(&data).is_err() { return rouille::Response::empty_400(); } - return rouille::Response::json(&data); + return rouille::Response::text(data); } - rouille::Response::empty_404() }, _ => rouille::Response::empty_404() From fe0171a1acadcab34a7d79dda9e4082271248683 Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 8 Nov 2022 18:00:39 +0530 Subject: [PATCH 10/26] feat: removed count in run span and created span for server --- rumqttd/src/router/iobufs.rs | 2 +- rumqttd/src/router/routing.rs | 23 ++++++++--------------- rumqttd/src/router/scheduler.rs | 10 ++-------- rumqttd/src/server/broker.rs | 22 +++++++++++++--------- 4 files changed, 24 insertions(+), 33 deletions(-) diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 11ebe954d..9cc3c2495 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -162,7 +162,7 @@ impl Outgoing { // We don't support out of order acks if pkid != head { - error!(error = "out of order ack.", pkid, head); + error!(pkid, head, "out of order ack."); return None; } diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 59da7c86a..f4ede1516 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -150,9 +150,8 @@ impl Router { /// Waits on incoming events when ready queue is empty. /// After pulling 1 event, tries to pull 500 more events /// before polling ready queue 100 times (connections) + #[tracing::instrument(skip_all)] fn run(&mut self, count: usize) -> Result<(), RouterError> { - let span = tracing::info_span!("run", count); - let _guard = span.enter(); match count { 0 => loop { self.run_inner()?; @@ -301,13 +300,13 @@ impl Router { } }; - let span = tracing::info_span!("handle_disconnection", client_id); + let span = tracing::info_span!("handle_disconnection", client_id, id); let _guard = span.enter(); if execute_last_will { self.handle_last_will(id); } - info!(id, "disconnect"); + info!("disconnect"); // Remove connection from router let mut connection = self.connections.remove(id); @@ -369,7 +368,7 @@ impl Router { }; let client_id = incoming.client_id.clone(); - let span = tracing::info_span!("handle_payload", client_id, connection_id = id); + let span = tracing::info_span!("handle_payload", client_id, id); let _guard = span.enter(); // Instead of exchanging, we should just append new incoming packets inside cache let mut packets = incoming.exchange(self.cache.take().unwrap()); @@ -483,8 +482,7 @@ impl Router { let connection = self.connections.get_mut(id).unwrap(); if let Err(e) = validate_subscription(/*connection,*/ &f) { - let id = &self.ibufs[id].client_id; - error!(id, reason=?e,"bad-subscription" ); + error!(reason=?e,"bad-subscription" ); disconnect = true; break; } @@ -535,14 +533,9 @@ impl Router { let meter = &mut self.ibufs.get_mut(id).unwrap().meter; meter.subscribe_count -= 1; - let outgoing = self.obufs.get_mut(id).unwrap(); - if connection.subscriptions.contains(&filter) { connection.subscriptions.remove(&filter); - debug!( - outgoing_client_id = outgoing.client_id, - filter, "unsubscribe" - ); + debug!(filter, "unsubscribe"); } else { error!(pkid = unsubscribe.pkid, "unsubscribe-failed"); continue; @@ -1016,7 +1009,7 @@ fn forward_device_data( let (len, inflight) = outgoing.push_forwards(forwards, qos, filter_idx); - trace!(inflight, buffer = len, "inflight"); + trace!(inflight, buffer = len); if len >= MAX_CHANNEL_CAPACITY - 1 { outgoing.push_notification(Notification::Unschedule); @@ -1040,7 +1033,7 @@ fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: Shado payload: publish.payload, }; - // FIll notify shadow + // Fill notify shadow let message = Notification::Shadow(shadow_reply); let len = outgoing.push_notification(message); let _should_unschedule = if len >= MAX_CHANNEL_CAPACITY - 1 { diff --git a/rumqttd/src/router/scheduler.rs b/rumqttd/src/router/scheduler.rs index 7820271a8..dd302b28e 100644 --- a/rumqttd/src/router/scheduler.rs +++ b/rumqttd/src/router/scheduler.rs @@ -63,12 +63,7 @@ impl Scheduler { pub fn reschedule(&mut self, id: ConnectionId, reason: ScheduleReason) { let tracker = self.trackers.get_mut(id).unwrap(); if let Some(v) = tracker.try_ready(reason) { - trace!( - tracker_id = tracker.id, - info = "reschedule", - "{:?} -> Ready", - v - ); + trace!(tracker_id = tracker.id, "reschedule {:?} -> Ready", v); self.readyqueue.push_back(id); } } @@ -79,8 +74,7 @@ impl Scheduler { trace!( tracker_id = tracker.id, - info = "pause", - "{:?} -> {:?}", + "pause {:?} -> {:?}", tracker.status, reason ); diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index ed6823ea4..63dc0b15d 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -296,7 +296,7 @@ impl Server

{ }; info!( - name=?self.config.name, info="accept", ?addr, count + name=?self.config.name, ?addr, count,"accept" ); let config = config.clone(); @@ -343,19 +343,20 @@ async fn remote( let client_id = link.client_id.clone(); let connection_id = link.connection_id; let mut execute_will = false; - + let span = tracing::info_span!("remote", client_id, connection_id); + let _guard = span.enter(); match link.start().await { // Connection get close. This shouldn't usually happen - Ok(_) => error!(client_id, "connection-stop"), + Ok(_) => error!("connection-stop"), // No need to send a disconnect message when disconnetion // originated internally in the router Err(remote::Error::Link(e)) => { - error!(client_id, error=?e, "router-drop"); + error!(error=?e, "router-drop"); return; } // Any other error Err(e) => { - error!(client_id, error=?e,"Disconnected!!"); + error!(error=?e,"Disconnected!!"); execute_will = true; } }; @@ -381,7 +382,7 @@ async fn shadow_connection( let mut link = match ShadowLink::new(config, router_tx.clone(), stream).await { Ok(l) => l, Err(e) => { - error!("{:15.15}[E] Remote link error = {:?}", "", e); + error!(reason=?e, "Remote link error"); return; } }; @@ -389,16 +390,19 @@ async fn shadow_connection( let client_id = link.client_id.clone(); let connection_id = link.connection_id; + let span = tracing::info_span!("shadow_connection", client_id, connection_id); + let _guard = span.enter(); + match link.start().await { // Connection get close. This shouldn't usually happen - Ok(_) => error!("{:15.15}[E] connection-stop", client_id), + Ok(_) => error!("connection-stop"), // No need to send a disconnect message when disconnetion // originated internally in the router Err(shadow::Error::Link(e)) => { - error!("{:15.15}[E] {:20} {:?}", client_id, "router-drop", e); + error!(reason=?e, "router-drop"); return; } - Err(e) => error!("{:15.15}[E] {}", client_id, e.to_string()), + Err(e) => error!(?e), }; let disconnect = Disconnection { From 521fe403e790d26f66925297b27b8a479d7c1415 Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 8 Nov 2022 18:20:28 +0530 Subject: [PATCH 11/26] feat: tracing --- rumqttd/src/link/shadow.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index 33cd0c2da..dfb262487 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -17,6 +17,7 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{accept_async, tungstenite, WebSocketStream}; use tungstenite::protocol::frame::coding::CloseCode; +use tracing::{error, debug, warn}; use super::network::N; @@ -95,13 +96,16 @@ impl ShadowLink { let mut ping = time::interval(Duration::from_secs(10)); let mut pong = true; + let span = tracing::info_span!("shadowlink_start", client_id=self.client_id); + let _guard = span.enter(); + // Note: // Shouldn't result in bounded queue deadlocks because of blocking n/w send loop { select! { o = self.network.read() => { let message = o?; - debug!("{:15.15} {:20} size = {} bytes", self.client_id, "read", message.len()); + debug!(size = message.len(), "read" ); match message { Message::Text(m) => { self.extract_message(&m).await?; @@ -136,7 +140,7 @@ impl ShadowLink { continue }, Notification::DeviceAck(ack) => { - warn!("Ignoring acks for shadow. Ack = {:?}", ack); + warn!(?ack, "Ignoring acks for shadow."); continue; } Notification::Shadow(shadow) => { @@ -147,9 +151,9 @@ impl ShadowLink { v => unreachable!("Expecting only data or device acks. Received = {:?}", v) }; - let write_len = message.len(); + let size = message.len(); self.network.write(message).await?; - debug!("{:15.15} {:20} size = {} bytes", self.client_id, "write", write_len); + debug!(size, "write"); } _ = interval.tick() => { for filter in self.subscriptions.iter() { @@ -158,7 +162,7 @@ impl ShadowLink { } _ = ping.tick() => { if !pong { - error!("{:15.15} {:20}", self.client_id, "no-pong"); + error!("no-pong"); break } @@ -173,6 +177,8 @@ impl ShadowLink { } async fn extract_message(&mut self, message: &str) -> Result<(), Error> { + let span = tracing::info_span!("extract_message", client_id=self.client_id); + let _guard = span.enter(); match serde_json::from_str(message)? { Incoming::Shadow { filter } => match validate_shadow(&self.client_id, &filter) { Ok(_) => { @@ -180,7 +186,7 @@ impl ShadowLink { self.link_tx.try_subscribe(filter)?; } Err(e) => { - error!("{:15.15} {:20}: {}", self.client_id, "validation error", e); + error!(?e, "validation error"); self.network.write(close(&e)).await?; return Err(e); } From cc108ab4aa3ac388a41d944b02a807f7b34ef0ab Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 8 Nov 2022 21:37:54 +0530 Subject: [PATCH 12/26] feat: pretty logs --- rumqttd/src/lib.rs | 17 ++++++++++++++--- rumqttd/src/main.rs | 1 + 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index a9b4bd47f..a1676174e 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -3,7 +3,16 @@ use std::path::PathBuf; use segments::Storage; use serde::{Deserialize, Serialize}; -use tracing_subscriber::{filter::EnvFilter, fmt::Formatter, reload::Handle}; +use tracing_subscriber::{ + filter::EnvFilter, + fmt::{ + format::{Format, Pretty}, + Layer, + }, + layer::Layered, + reload::Handle, + Registry, +}; use std::net::SocketAddr; @@ -94,15 +103,17 @@ pub struct RouterConfig { pub initialized_filters: Option>, } +type ReloadHandle = Handle>, Registry>>; + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ConsoleSettings { pub listen: String, #[serde(skip)] - filter_handle: Option>, + filter_handle: Option, } impl ConsoleSettings { - pub fn set_filter_reload_handle(&mut self, handle: Handle) { + pub fn set_filter_reload_handle(&mut self, handle: ReloadHandle) { self.filter_handle.replace(handle); } } diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index e94987b3c..3da0fa0a5 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -39,6 +39,7 @@ fn main() { }; let builder = tracing_subscriber::fmt() + .pretty() .with_line_number(false) .with_file(false) .with_thread_ids(false) From 15905e195d3e29bea7006e32475963c2bb2215fc Mon Sep 17 00:00:00 2001 From: swanandx <73115739+swanandx@users.noreply.github.com> Date: Tue, 8 Nov 2022 22:11:35 +0530 Subject: [PATCH 13/26] feat: use instrument combinator for attacing span to futures --- rumqttd/src/link/remote.rs | 2 +- rumqttd/src/link/shadow.rs | 9 +++------ rumqttd/src/server/broker.rs | 31 ++++++++++++++++++++++--------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index 0ead9a988..80382dfe0 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -129,7 +129,7 @@ impl RemoteLink

{ buffer.len() }; - debug!(client_id=self.client_id, info="packets", buffercount=len); + debug!(buffercount=len, "packets"); self.link_tx.notify().await?; } // Receive from router when previous when state isn't in collision diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index dfb262487..930caec60 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -16,8 +16,8 @@ use tokio::{select, time}; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{accept_async, tungstenite, WebSocketStream}; +use tracing::{debug, error, warn}; use tungstenite::protocol::frame::coding::CloseCode; -use tracing::{error, debug, warn}; use super::network::N; @@ -91,14 +91,12 @@ impl ShadowLink { }) } + #[tracing::instrument(skip_all, fields(client_id=self.client_id))] pub async fn start(&mut self) -> Result<(), Error> { let mut interval = time::interval(Duration::from_secs(5)); let mut ping = time::interval(Duration::from_secs(10)); let mut pong = true; - let span = tracing::info_span!("shadowlink_start", client_id=self.client_id); - let _guard = span.enter(); - // Note: // Shouldn't result in bounded queue deadlocks because of blocking n/w send loop { @@ -176,9 +174,8 @@ impl ShadowLink { Ok(()) } + #[tracing::instrument(skip_all, fields(client_id=self.client_id))] async fn extract_message(&mut self, message: &str) -> Result<(), Error> { - let span = tracing::info_span!("extract_message", client_id=self.client_id); - let _guard = span.enter(); match serde_json::from_str(message)? { Incoming::Shadow { filter } => match validate_shadow(&self.client_id, &filter) { Ok(_) => { diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 63dc0b15d..5b79ba2d2 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -14,7 +14,7 @@ use crate::server::tls::{self, TLSAcceptor}; use crate::ConnectionSettings; use flume::{RecvError, SendError, Sender}; use std::sync::Arc; -use tracing::{error, info}; +use tracing::{error, field, info, Instrument, Span}; #[cfg(feature = "websockets")] use websocket_codec::MessageCodec; @@ -306,10 +306,22 @@ impl Server

{ let protocol = self.protocol.clone(); match shadow { #[cfg(feature = "websockets")] - true => task::spawn(shadow_connection(config, router_tx, network)), - _ => task::spawn(remote( - config, /*tenant_id,*/ router_tx, network, protocol, + true => task::spawn(shadow_connection(config, router_tx, network).instrument( + tracing::info_span!( + "shadow_connection", + client_id = field::Empty, + connection_id = field::Empty + ), )), + _ => task::spawn( + remote(config, /*tenant_id,*/ router_tx, network, protocol).instrument( + tracing::info_span!( + "remote", + client_id = field::Empty, + connection_id = field::Empty + ), + ), + ), }; time::sleep(delay).await; @@ -322,7 +334,6 @@ impl Server

{ /// waiting for mqtt connect packet. Also this honours connection wait time as per config to prevent /// denial of service attacks (rogue clients which only does network connection without sending /// mqtt connection packet to make make the server reach its concurrent connection limit) -#[tracing::instrument(skip_all)] async fn remote( config: Arc, // tenant_id: Option, @@ -343,8 +354,10 @@ async fn remote( let client_id = link.client_id.clone(); let connection_id = link.connection_id; let mut execute_will = false; - let span = tracing::info_span!("remote", client_id, connection_id); - let _guard = span.enter(); + + Span::current().record("client_id", &client_id); + Span::current().record("connection_id", &connection_id); + match link.start().await { // Connection get close. This shouldn't usually happen Ok(_) => error!("connection-stop"), @@ -390,8 +403,8 @@ async fn shadow_connection( let client_id = link.client_id.clone(); let connection_id = link.connection_id; - let span = tracing::info_span!("shadow_connection", client_id, connection_id); - let _guard = span.enter(); + Span::current().record("client_id", &client_id); + Span::current().record("connection_id", &connection_id); match link.start().await { // Connection get close. This shouldn't usually happen From 63721a47835382656f131d337e1d2befe42f86ef Mon Sep 17 00:00:00 2001 From: tekjar Date: Fri, 11 Nov 2022 19:54:47 +0530 Subject: [PATCH 14/26] Add meters link --- rumqttd/src/link/meters.rs | 65 +++++++++++++++++++++++++++++++++++ rumqttd/src/link/mod.rs | 1 + rumqttd/src/router/iobufs.rs | 3 +- rumqttd/src/router/meter.rs | 8 +++++ rumqttd/src/router/mod.rs | 24 +++++++++++-- rumqttd/src/router/routing.rs | 53 +++++++++++++++++++++++++--- 6 files changed, 146 insertions(+), 8 deletions(-) create mode 100644 rumqttd/src/link/meters.rs create mode 100644 rumqttd/src/router/meter.rs diff --git a/rumqttd/src/link/meters.rs b/rumqttd/src/link/meters.rs new file mode 100644 index 000000000..966b1140a --- /dev/null +++ b/rumqttd/src/link/meters.rs @@ -0,0 +1,65 @@ +use crate::router::{Event, GetMeter, Meter}; +use crate::ConnectionId; +use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TrySendError}; + +#[derive(Debug, thiserror::Error)] +pub enum LinkError { + #[error("Channel try send error")] + TrySend(#[from] TrySendError<(ConnectionId, Event)>), + #[error("Channel send error")] + Send(#[from] SendError<(ConnectionId, Event)>), + #[error("Channel recv error")] + Recv(#[from] RecvError), + #[error("Channel timeout recv error")] + RecvTimeout(#[from] RecvTimeoutError), + #[error("Timeout = {0}")] + Elapsed(#[from] tokio::time::error::Elapsed), +} + +pub struct MetersLink { + pub(crate) meter_id: ConnectionId, + router_tx: Sender<(ConnectionId, Event)>, + router_rx: Receiver<(ConnectionId, Meter)>, +} + +impl MetersLink { + pub fn new(router_tx: Sender<(ConnectionId, Event)>) -> Result { + let (tx, rx) = flume::bounded(5); + router_tx.send((0, Event::NewMeter(tx)))?; + let (meter_id, _meter) = rx.recv()?; + + let link = MetersLink { + meter_id, + router_tx, + router_rx: rx, + }; + + Ok(link) + } + + pub async fn init(router_tx: Sender<(ConnectionId, Event)>) -> Result { + let (tx, rx) = flume::bounded(5); + router_tx.send((0, Event::NewMeter(tx)))?; + let (meter_id, _meter) = rx.recv_async().await?; + + let link = MetersLink { + meter_id, + router_tx, + router_rx: rx, + }; + + Ok(link) + } + + pub fn get(&self, meter: GetMeter) -> Result { + self.router_tx.send((self.meter_id, Event::GetMeter(meter)))?; + let (_meter_id, meter) = self.router_rx.recv()?; + Ok(meter) + } + + pub async fn fetch(&self, meter: GetMeter) -> Result { + self.router_tx.send_async((self.meter_id, Event::GetMeter(meter))).await?; + let (_meter_id, meter) = self.router_rx.recv_async().await?; + Ok(meter) + } +} diff --git a/rumqttd/src/link/mod.rs b/rumqttd/src/link/mod.rs index 6b03476a3..2a14408a8 100644 --- a/rumqttd/src/link/mod.rs +++ b/rumqttd/src/link/mod.rs @@ -1,6 +1,7 @@ // pub mod bridge; pub mod console; pub mod local; +pub mod meters; pub mod network; pub mod remote; #[cfg(feature = "websockets")] diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 9cc3c2495..e23f3927d 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -10,7 +10,7 @@ use crate::{ Cursor, Notification, }; -use super::Forward; +use super::{Forward, Meter}; const MAX_INFLIGHT: usize = 100; const MAX_PKID: u16 = MAX_INFLIGHT as u16; @@ -235,6 +235,7 @@ impl OutgoingMeter { } } + #[cfg(test)] mod test { // use super::{Outgoing, MAX_INFLIGHT}; diff --git a/rumqttd/src/router/meter.rs b/rumqttd/src/router/meter.rs new file mode 100644 index 000000000..a95e44886 --- /dev/null +++ b/rumqttd/src/router/meter.rs @@ -0,0 +1,8 @@ +use super::{RouterMeter, SubscriptionMeter, ConnectionMeter}; + + +pub struct Meter { + router: RouterMeter, + subscriptions: SubscriptionMeter, + connections: ConnectionMeter +} diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 2fdc3a248..2e8556dd1 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -19,6 +19,7 @@ mod connection; mod graveyard; pub mod iobufs; mod logs; +mod meter; mod routing; mod scheduler; mod waiters; @@ -43,6 +44,10 @@ pub enum Event { incoming: iobufs::Incoming, outgoing: iobufs::Outgoing, }, + /// New meter link + NewMeter(flume::Sender<(ConnectionId, Meter)>), + /// Request for meter + GetMeter(GetMeter), /// Connection ready to receive more data Ready, /// Data for native commitlog @@ -215,7 +220,7 @@ pub struct ShadowReply { } #[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct RouterMetrics { +pub struct RouterMeter { pub router_id: RouterId, pub total_connections: usize, pub total_subscriptions: usize, @@ -269,6 +274,21 @@ impl ConnectionMeter { } } + +#[derive(Debug, Clone)] +pub enum GetMeter { + Router, + Connection(String), + Subscription(String) +} + +#[derive(Debug, Clone)] +pub enum Meter { + Router(usize, RouterMeter), + Connection(String, ConnectionMeter), + Subscription(String, SubscriptionMeter), +} + #[derive(Debug, Clone)] pub enum MetricsRequest { Config, @@ -284,7 +304,7 @@ pub enum MetricsRequest { #[serde(rename_all = "lowercase")] pub enum MetricsReply { Config(RouterConfig), - Router(RouterMetrics), + Router(RouterMeter), Connection(Option<(ConnectionMeter, Tracker)>), Subscriptions(HashMap>), Subscription(Option), diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index f4ede1516..2ccb1eda1 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -22,8 +22,9 @@ use super::iobufs::{Incoming, Outgoing}; use super::logs::{AckLog, DataLog}; use super::scheduler::{ScheduleReason, Scheduler}; use super::{ - packetid, Connection, DataRequest, Event, FilterIdx, MetricsReply, MetricsRequest, - Notification, RouterMetrics, ShadowRequest, MAX_CHANNEL_CAPACITY, MAX_SCHEDULE_ITERATIONS, + packetid, Connection, DataRequest, Event, FilterIdx, GetMeter, Meter, MetricsReply, + MetricsRequest, Notification, RouterMeter, ShadowRequest, MAX_CHANNEL_CAPACITY, + MAX_SCHEDULE_ITERATIONS, }; #[derive(Error, Debug)] @@ -54,6 +55,8 @@ pub struct Router { /// Saved state of dead persistent connections graveyard: Graveyard, /// List of connections + meters: Slab>, + /// List of connections connections: Slab, /// Connection map from device id to connection id connection_map: HashMap, @@ -80,7 +83,7 @@ pub struct Router { /// with this router router_tx: Sender<(ConnectionId, Event)>, /// Router metrics - router_metrics: RouterMetrics, + router_metrics: RouterMeter, /// Buffer for cache exchange of incoming packets cache: Option>, } @@ -89,14 +92,15 @@ impl Router { pub fn new(router_id: RouterId, config: RouterConfig) -> Router { let (router_tx, router_rx) = bounded(1000); + let meters = Slab::with_capacity(10); let connections = Slab::with_capacity(config.max_connections); let ibufs = Slab::with_capacity(config.max_connections); let obufs = Slab::with_capacity(config.max_connections); let ackslog = Slab::with_capacity(config.max_connections); - let router_metrics = RouterMetrics { + let router_metrics = RouterMeter { router_id, - ..RouterMetrics::default() + ..RouterMeter::default() }; let max_connections = config.max_connections; @@ -104,6 +108,7 @@ impl Router { id: router_id, config: config.clone(), graveyard: Graveyard::new(), + meters, connections, connection_map: Default::default(), subscription_map: Default::default(), @@ -207,6 +212,8 @@ impl Router { incoming, outgoing, } => self.handle_new_connection(connection, incoming, outgoing), + Event::NewMeter(tx) => self.handle_new_meter(tx), + Event::GetMeter(meter) => self.handle_get_meter(id, meter), Event::DeviceData => self.handle_device_payload(id), Event::Disconnect(disconnect) => self.handle_disconnection(id, disconnect.execute_will), Event::Ready => self.scheduler.reschedule(id, ScheduleReason::Ready), @@ -289,6 +296,15 @@ impl Router { .reschedule(connection_id, ScheduleReason::Init); } + fn handle_new_meter(&mut self, tx: Sender<(ConnectionId, Meter)>) { + let meter_id = self.meters.insert(tx); + let tx = &self.meters[meter_id]; + let _ = tx.try_send(( + meter_id, + Meter::Router(self.id, self.router_metrics.clone()), + )); + } + fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` @@ -824,6 +840,33 @@ impl Router { } }; } + + fn handle_get_meter(&self, meter_id: ConnectionId, meter: router::GetMeter) { + let meter_tx = &self.meters[meter_id]; + match meter { + GetMeter::Router => { + let _ = meter_tx.try_send(( + meter_id, + Meter::Router(self.id, self.router_metrics.clone()), + )); + } + GetMeter::Connection(client_id) => { + let connection_id = self.connection_map.get(&client_id).unwrap(); + + // Update metrics + if let Some(meter) = self.connections.get(*connection_id).map(|v| &v.meter) { + let meter = Meter::Connection(client_id, meter.clone()); + let _ = meter_tx.try_send((meter_id, meter)); + } + } + GetMeter::Subscription(filter) => { + if let Some(meter) = self.datalog.meter(&filter) { + let meter = Meter::Subscription(filter.clone(), meter.clone()); + let _ = meter_tx.try_send((meter_id, meter)); + } + } + }; + } } #[tracing::instrument(skip_all)] From 7fdfefd9499d0d6945813e5a0cc47aeedd37e706 Mon Sep 17 00:00:00 2001 From: tekjar Date: Sat, 12 Nov 2022 18:10:13 +0530 Subject: [PATCH 15/26] Cleanup unused Meters --- rumqttd/src/lib.rs | 4 +++- rumqttd/src/link/remote.rs | 4 ++-- rumqttd/src/router/iobufs.rs | 3 +-- rumqttd/src/router/meter.rs | 8 -------- rumqttd/src/router/mod.rs | 1 - 5 files changed, 6 insertions(+), 14 deletions(-) delete mode 100644 rumqttd/src/router/meter.rs diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index a1676174e..c3dd74ea2 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -31,7 +31,9 @@ pub type TopicId = usize; pub type Offset = (u64, u64); pub type Cursor = (u64, u64); -pub use link::local::{Link, LinkError, LinkRx, LinkTx}; +pub use link::local; +pub use link::meters; + pub use router::Notification; pub use server::Broker; diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index 80382dfe0..b15607404 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -1,9 +1,9 @@ -use crate::link::local::{LinkError, LinkRx, LinkTx}; +use crate::link::local::{Link, LinkError, LinkRx, LinkTx}; use crate::link::network; use crate::link::network::Network; use crate::protocol::{Connect, Packet, Protocol}; use crate::router::{Event, Notification}; -use crate::{ConnectionId, ConnectionSettings, Link}; +use crate::{ConnectionId, ConnectionSettings}; use flume::{RecvError, SendError, Sender, TrySendError}; use std::collections::VecDeque; diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index e23f3927d..9cc3c2495 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -10,7 +10,7 @@ use crate::{ Cursor, Notification, }; -use super::{Forward, Meter}; +use super::Forward; const MAX_INFLIGHT: usize = 100; const MAX_PKID: u16 = MAX_INFLIGHT as u16; @@ -235,7 +235,6 @@ impl OutgoingMeter { } } - #[cfg(test)] mod test { // use super::{Outgoing, MAX_INFLIGHT}; diff --git a/rumqttd/src/router/meter.rs b/rumqttd/src/router/meter.rs deleted file mode 100644 index a95e44886..000000000 --- a/rumqttd/src/router/meter.rs +++ /dev/null @@ -1,8 +0,0 @@ -use super::{RouterMeter, SubscriptionMeter, ConnectionMeter}; - - -pub struct Meter { - router: RouterMeter, - subscriptions: SubscriptionMeter, - connections: ConnectionMeter -} diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 2e8556dd1..4e01d36c0 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -19,7 +19,6 @@ mod connection; mod graveyard; pub mod iobufs; mod logs; -mod meter; mod routing; mod scheduler; mod waiters; From 7923b79f7d11a5bd47027c455d10e7524ffb0cd3 Mon Sep 17 00:00:00 2001 From: tekjar Date: Fri, 18 Nov 2022 01:00:48 +0530 Subject: [PATCH 16/26] Add meters to singlenode --- rumqttd/examples/singlenode.rs | 17 +++++++++++++++-- rumqttd/src/lib.rs | 1 + rumqttd/src/server/broker.rs | 12 +++++++----- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/rumqttd/examples/singlenode.rs b/rumqttd/examples/singlenode.rs index b0472c49c..88da2987d 100644 --- a/rumqttd/examples/singlenode.rs +++ b/rumqttd/examples/singlenode.rs @@ -1,6 +1,6 @@ -use rumqttd::{Broker, Config, Notification}; +use rumqttd::{Broker, Config, GetMeter, Notification}; -use std::thread; +use std::{thread, time::Duration}; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -21,11 +21,24 @@ fn main() { dbg!(&config); let mut broker = Broker::new(config); + + let meters = broker.meters().unwrap(); let (_link_tx, mut link_rx) = broker.link("singlenode").unwrap(); + thread::spawn(move || { broker.start().unwrap(); }); + thread::spawn(move || -> ! { + loop { + let v = meters + .get(GetMeter::Subscription("hello/world".to_owned())) + .unwrap(); + println!("{:?}", v); + thread::sleep(Duration::from_secs(1)); + } + }); + let mut count = 0; loop { let notification = match link_rx.recv().unwrap() { diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index c3dd74ea2..31412db62 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -34,6 +34,7 @@ pub type Cursor = (u64, u64); pub use link::local; pub use link::meters; +pub use router::GetMeter; pub use router::Notification; pub use server::Broker; diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 374525b17..b7325249c 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -11,7 +11,7 @@ use crate::protocol::ws::Ws; use crate::protocol::Protocol; #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] use crate::server::tls::{self, TLSAcceptor}; -use crate::ConnectionSettings; +use crate::{ConnectionSettings, meters}; use flume::{RecvError, SendError, Sender}; use std::sync::Arc; use tracing::{error, field, info, Instrument, Span}; @@ -120,6 +120,12 @@ impl Broker { // } // } + // Link to get meters + pub fn meters(&self) -> Result { + let link = meters::MetersLink::new(self.router_tx.clone())?; + Ok(link) + } + pub fn link(&self, client_id: &str) -> Result<(LinkTx, LinkRx), local::LinkError> { // Register this connection with the router. Router replies with ack which if ok will // start the link. Router can sometimes reject the connection (ex max connection limit) @@ -404,11 +410,7 @@ async fn shadow_connection( let connection_id = link.connection_id; Span::current().record("client_id", &client_id); -<<<<<<< HEAD Span::current().record("connection_id", &connection_id); -======= - Span::current().record("connection_id", connection_id); ->>>>>>> main match link.start().await { // Connection get close. This shouldn't usually happen From 881d6131111d0007327f2676fd0bd1a4348a537d Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 00:57:23 +0530 Subject: [PATCH 17/26] Add meters example --- rumqttd/examples/meters.rs | 64 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 rumqttd/examples/meters.rs diff --git a/rumqttd/examples/meters.rs b/rumqttd/examples/meters.rs new file mode 100644 index 000000000..88da2987d --- /dev/null +++ b/rumqttd/examples/meters.rs @@ -0,0 +1,64 @@ +use rumqttd::{Broker, Config, GetMeter, Notification}; + +use std::{thread, time::Duration}; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +fn main() { + pretty_env_logger::init(); + + // As examples are compiled as seperate binary so this config is current path dependent. Run it + // from root of this crate + let config = config::Config::builder() + .add_source(config::File::with_name("demo.toml")) + .build() + .unwrap(); + + let config: Config = config.try_deserialize().unwrap(); + + dbg!(&config); + + let mut broker = Broker::new(config); + + let meters = broker.meters().unwrap(); + let (_link_tx, mut link_rx) = broker.link("singlenode").unwrap(); + + thread::spawn(move || { + broker.start().unwrap(); + }); + + thread::spawn(move || -> ! { + loop { + let v = meters + .get(GetMeter::Subscription("hello/world".to_owned())) + .unwrap(); + println!("{:?}", v); + thread::sleep(Duration::from_secs(1)); + } + }); + + let mut count = 0; + loop { + let notification = match link_rx.recv().unwrap() { + Some(v) => v, + None => continue, + }; + + match notification { + Notification::Forward(forward) => { + count += 1; + println!( + "Topic = {:?}, Count = {}, Payload = {} bytes", + forward.publish.topic, + count, + forward.publish.payload.len() + ); + } + v => { + println!("{:?}", v); + } + } + } +} From 14a7815f9a2dc430d1c853f3d633af061f359668 Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 00:58:23 +0530 Subject: [PATCH 18/26] Revert single node example --- rumqttd/examples/singlenode.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/rumqttd/examples/singlenode.rs b/rumqttd/examples/singlenode.rs index 88da2987d..b0472c49c 100644 --- a/rumqttd/examples/singlenode.rs +++ b/rumqttd/examples/singlenode.rs @@ -1,6 +1,6 @@ -use rumqttd::{Broker, Config, GetMeter, Notification}; +use rumqttd::{Broker, Config, Notification}; -use std::{thread, time::Duration}; +use std::thread; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -21,24 +21,11 @@ fn main() { dbg!(&config); let mut broker = Broker::new(config); - - let meters = broker.meters().unwrap(); let (_link_tx, mut link_rx) = broker.link("singlenode").unwrap(); - thread::spawn(move || { broker.start().unwrap(); }); - thread::spawn(move || -> ! { - loop { - let v = meters - .get(GetMeter::Subscription("hello/world".to_owned())) - .unwrap(); - println!("{:?}", v); - thread::sleep(Duration::from_secs(1)); - } - }); - let mut count = 0; loop { let notification = match link_rx.recv().unwrap() { From 265acec7062baf60194ea698b6b1c74fb639873c Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 02:40:35 +0530 Subject: [PATCH 19/26] Improve meters example separate out meters and connection events --- rumqttd/examples/meters.rs | 76 +++++++++++++++++--------------- rumqttd/src/router/connection.rs | 8 ++-- rumqttd/src/router/graveyard.rs | 8 ++-- rumqttd/src/router/iobufs.rs | 69 +---------------------------- rumqttd/src/router/mod.rs | 55 ++++++++--------------- rumqttd/src/router/routing.rs | 44 +++++++++--------- 6 files changed, 88 insertions(+), 172 deletions(-) diff --git a/rumqttd/examples/meters.rs b/rumqttd/examples/meters.rs index 88da2987d..feda8d1aa 100644 --- a/rumqttd/examples/meters.rs +++ b/rumqttd/examples/meters.rs @@ -1,11 +1,6 @@ use rumqttd::{Broker, Config, GetMeter, Notification}; - use std::{thread, time::Duration}; -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; - fn main() { pretty_env_logger::init(); @@ -20,45 +15,54 @@ fn main() { dbg!(&config); - let mut broker = Broker::new(config); - + let broker = Broker::new(config); let meters = broker.meters().unwrap(); - let (_link_tx, mut link_rx) = broker.link("singlenode").unwrap(); + let (mut link_tx, mut link_rx) = broker.link("consumer").unwrap(); + link_tx.subscribe("hello/+/world").unwrap(); thread::spawn(move || { - broker.start().unwrap(); - }); - - thread::spawn(move || -> ! { + let mut count = 0; loop { - let v = meters - .get(GetMeter::Subscription("hello/world".to_owned())) - .unwrap(); - println!("{:?}", v); - thread::sleep(Duration::from_secs(1)); + let notification = match link_rx.recv().unwrap() { + Some(v) => v, + None => continue, + }; + + match notification { + Notification::Forward(forward) => { + count += 1; + println!( + "Topic = {:?}, Count = {}, Payload = {} bytes", + forward.publish.topic, + count, + forward.publish.payload.len() + ); + } + v => { + println!("{:?}", v); + } + } } }); - let mut count = 0; - loop { - let notification = match link_rx.recv().unwrap() { - Some(v) => v, - None => continue, - }; + for i in 0..10 { + let client_id = format!("client_{i}"); + let topic = format!("hello/{}/world", client_id); + let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB + let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made"); - match notification { - Notification::Forward(forward) => { - count += 1; - println!( - "Topic = {:?}, Count = {}, Payload = {} bytes", - forward.publish.topic, - count, - forward.publish.payload.len() - ); - } - v => { - println!("{:?}", v); + thread::spawn(move || { + for _ in 0..10 { + thread::sleep(Duration::from_secs(1)); + link_tx.publish(topic.clone(), payload.clone()).unwrap(); } - } + }); + } + + loop { + let request = GetMeter::Subscription("hello/+/world".to_owned()); + let v = meters.get(request).unwrap(); + println!("{:?}", v); + thread::sleep(Duration::from_secs(1)); } } diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index 202eab815..2a33f04be 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -3,7 +3,7 @@ use crate::Filter; use flume::{bounded, Receiver, Sender}; use std::collections::HashSet; -use super::{ConnectionMeter, MetricsReply}; +use super::{ConnectionEvents, MetricsReply}; /// Used to register a new connection with the router /// Connection messages encompasses a handle for router to @@ -21,8 +21,8 @@ pub struct Connection { pub subscriptions: HashSet, /// Handle to send metrics reply pub metrics: Sender, - /// Connection metrics - pub meter: ConnectionMeter, + /// Connection events + pub events: ConnectionEvents, pub last_will: Option, } @@ -55,7 +55,7 @@ impl Connection { clean, subscriptions: HashSet::default(), metrics: metrics_tx, - meter: ConnectionMeter::default(), + events: ConnectionEvents::default(), last_will, }; diff --git a/rumqttd/src/router/graveyard.rs b/rumqttd/src/router/graveyard.rs index 485d38c13..96c6e4b98 100644 --- a/rumqttd/src/router/graveyard.rs +++ b/rumqttd/src/router/graveyard.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use super::{ scheduler::{PauseReason, Tracker}, - ConnectionMeter, + ConnectionEvents, }; pub struct Graveyard { @@ -27,7 +27,7 @@ impl Graveyard { &mut self, mut tracker: Tracker, subscriptions: HashSet, - metrics: ConnectionMeter, + metrics: ConnectionEvents, ) { tracker.pause(PauseReason::Busy); let id = tracker.id.clone(); @@ -47,7 +47,7 @@ impl Graveyard { pub struct SavedState { pub tracker: Tracker, pub subscriptions: HashSet, - pub metrics: ConnectionMeter, + pub metrics: ConnectionEvents, } impl SavedState { @@ -55,7 +55,7 @@ impl SavedState { SavedState { tracker: Tracker::new(client_id), subscriptions: HashSet::new(), - metrics: ConnectionMeter::default(), + metrics: ConnectionEvents::default(), } } } diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index 9cc3c2495..d176bf8c9 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, sync::Arc, time::Instant}; +use std::{collections::VecDeque, sync::Arc}; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -10,7 +10,7 @@ use crate::{ Cursor, Notification, }; -use super::Forward; +use super::{Forward, IncomingMeter, OutgoingMeter}; const MAX_INFLIGHT: usize = 100; const MAX_PKID: u16 = MAX_INFLIGHT as u16; @@ -170,71 +170,6 @@ impl Outgoing { } } -#[derive(Debug)] -#[allow(dead_code)] -pub struct IncomingMeter { - pub(crate) publish_count: usize, - pub(crate) subscribe_count: usize, - pub(crate) total_size: usize, - pub(crate) last_timestamp: Instant, - start: Instant, - pub(crate) data_rate: usize, -} - -impl Default for IncomingMeter { - fn default() -> Self { - Self { - publish_count: 0, - subscribe_count: 0, - total_size: 0, - last_timestamp: Instant::now(), - start: Instant::now(), - data_rate: 0, - } - } -} - -impl IncomingMeter { - //TODO: Fix this - #[allow(dead_code)] - pub(crate) fn average_last_data_rate(&mut self) { - let now = Instant::now(); - self.data_rate = self.total_size / now.duration_since(self.start).as_micros() as usize; - self.last_timestamp = now; - } -} - -#[derive(Debug)] -#[allow(dead_code)] -pub struct OutgoingMeter { - pub(crate) publish_count: usize, - pub(crate) total_size: usize, - pub(crate) last_timestamp: Instant, - start: Instant, - pub(crate) data_rate: usize, -} - -impl Default for OutgoingMeter { - fn default() -> Self { - Self { - publish_count: 0, - total_size: 0, - last_timestamp: Instant::now(), - start: Instant::now(), - data_rate: 0, - } - } -} - -impl OutgoingMeter { - #[allow(dead_code)] - pub(crate) fn average_last_data_rate(&mut self) { - let now = Instant::now(); - self.data_rate = self.total_size / now.duration_since(self.start).as_micros() as usize; - self.last_timestamp = now; - } -} - #[cfg(test)] mod test { // use super::{Outgoing, MAX_INFLIGHT}; diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 4e01d36c0..581d37448 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, fmt, }; @@ -236,55 +236,36 @@ pub struct SubscriptionMeter { pub read_offset: usize, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct ConnectionMeter { - publish_count: usize, - publish_size: usize, - subscriptions: HashSet, - events: VecDeque, +#[derive(Debug, Default)] +pub struct IncomingMeter { + pub publish_count: usize, + pub subscribe_count: usize, + pub total_size: usize, } -impl ConnectionMeter { - pub fn increment_publish_count(&mut self) { - self.publish_count += 1 - } - - pub fn add_publish_size(&mut self, size: usize) { - self.publish_size += size; - } - - pub fn push_subscription(&mut self, filter: Filter) { - self.subscriptions.insert(filter); - } - - pub fn push_subscriptions(&mut self, filters: HashSet) { - self.subscriptions.extend(filters); - } - - pub fn push_event(&mut self, event: String) { - self.events.push_back(event); - if self.events.len() > 10 { - self.events.pop_front(); - } - } - - pub fn remove_subscription(&mut self, filter: Filter) { - self.subscriptions.remove(&filter); - } +#[derive(Debug, Default)] +#[allow(dead_code)] +pub struct OutgoingMeter { + pub publish_count: usize, + pub total_size: usize, } +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ConnectionEvents { + events: VecDeque, +} #[derive(Debug, Clone)] pub enum GetMeter { Router, Connection(String), - Subscription(String) + Subscription(String), } #[derive(Debug, Clone)] pub enum Meter { Router(usize, RouterMeter), - Connection(String, ConnectionMeter), + Connection(String, ConnectionEvents), Subscription(String, SubscriptionMeter), } @@ -304,7 +285,7 @@ pub enum MetricsRequest { pub enum MetricsReply { Config(RouterConfig), Router(RouterMeter), - Connection(Option<(ConnectionMeter, Tracker)>), + Connection(Option<(ConnectionEvents, Tracker)>), Subscriptions(HashMap>), Subscription(Option), Waiters(Option>), diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 7f15663bf..1c367d2fd 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -249,13 +249,12 @@ impl Router { let tracker = if !clean_session { let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s); connection.subscriptions = saved.subscriptions; - connection.meter = saved.metrics; + connection.events = saved.metrics; saved.tracker } else { // Only retrieve metrics in clean session let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s); - connection.meter = saved.metrics; - connection.meter.subscriptions.clear(); + connection.events = saved.metrics; Tracker::new(client_id.clone()) }; let ackslog = AckLog::new(); @@ -266,10 +265,11 @@ impl Router { }; let event = "connection at ".to_owned() + &time + ", clean = " + &clean_session.to_string(); - connection.meter.push_event(event); - connection - .meter - .push_subscriptions(connection.subscriptions.clone()); + connection.events.events.push_back(event); + + if connection.events.events.len() > 10 { + connection.events.events.pop_front(); + } let connection_id = self.connections.insert(connection); assert_eq!(self.ibufs.insert(incoming), connection_id); @@ -353,7 +353,11 @@ impl Router { }; let event = "disconnection at ".to_owned() + &time; - connection.meter.push_event(event); + connection.events.events.push_back(event); + + if connection.events.events.len() > 10 { + connection.events.events.pop_front(); + } // Save state for persistent sessions if !connection.clean { @@ -363,12 +367,11 @@ impl Router { .for_each(|r| tracker.register_data_request(r)); self.graveyard - .save(tracker, connection.subscriptions, connection.meter); + .save(tracker, connection.subscriptions, connection.events); } else { // Only save metrics in clean session - connection.meter.subscriptions.clear(); self.graveyard - .save(Tracker::new(client_id), HashSet::new(), connection.meter); + .save(Tracker::new(client_id), HashSet::new(), connection.events); } } @@ -476,12 +479,6 @@ impl Router { } }; - // Update metrics - if let Some(metrics) = self.connections.get_mut(id).map(|v| &mut v.meter) { - metrics.increment_publish_count(); - metrics.add_publish_size(size); - } - let meter = &mut self.ibufs.get_mut(id).unwrap().meter; meter.publish_count += 1; meter.total_size += size; @@ -495,7 +492,6 @@ impl Router { for f in s.filters { info!(filter = f.path, "subscribe"); - let connection = self.connections.get_mut(id).unwrap(); if let Err(e) = validate_subscription(/*connection,*/ &f) { error!(reason=?e,"bad-subscription" ); @@ -506,9 +502,6 @@ impl Router { let filter = f.path; let qos = f.qos; - // Update metrics - connection.meter.push_subscription(filter.clone()); - let (idx, cursor) = self.datalog.next_native_offset(&filter); self.prepare_filter(id, cursor, idx, filter.clone(), qos as u8); self.datalog @@ -545,7 +538,6 @@ impl Router { continue; } - connection.meter.remove_subscription(filter.clone()); let meter = &mut self.ibufs.get_mut(id).unwrap().meter; meter.subscribe_count -= 1; @@ -852,7 +844,7 @@ impl Router { let connection_id = self.connection_map.get(&client_id).unwrap(); // Update metrics - if let Some(meter) = self.connections.get(*connection_id).map(|v| &v.meter) { + if let Some(meter) = self.connections.get(*connection_id).map(|v| &v.events) { let meter = Meter::Connection(client_id, meter.clone()); let _ = meter_tx.try_send((meter_id, meter)); } @@ -1088,7 +1080,11 @@ fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsReque MetricsRequest::Router => MetricsReply::Router(router.router_metrics.clone()), MetricsRequest::Connection(id) => { let metrics = router.connection_map.get(&id).map(|v| { - let c = router.connections.get(*v).map(|v| v.meter.clone()).unwrap(); + let c = router + .connections + .get(*v) + .map(|v| v.events.clone()) + .unwrap(); let t = router.scheduler.trackers.get(*v).cloned().unwrap(); (c, t) }); From 0e938dcd1eaf8704fbbb62a8a85774f80bfe0bc4 Mon Sep 17 00:00:00 2001 From: Ravi Date: Mon, 21 Nov 2022 18:12:59 +0530 Subject: [PATCH 20/26] Remove metrics from local links --- rumqttd/src/link/console.rs | 18 ++++++------------ rumqttd/src/link/local.rs | 32 ++++++++------------------------ rumqttd/src/main.rs | 1 - rumqttd/src/router/connection.rs | 21 +++++++-------------- rumqttd/src/router/routing.rs | 7 +++---- 5 files changed, 24 insertions(+), 55 deletions(-) diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index f81d9c88f..5ac277ecd 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -48,8 +48,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/device/{id: String}) => { let event = Event::Metrics(MetricsRequest::Connection(id)); @@ -58,8 +57,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/subscriptions) => { let event = Event::Metrics(MetricsRequest::Subscriptions); @@ -68,8 +66,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/subscription/{filter: String}) => { let filter = filter.replace('.', "/"); @@ -79,8 +76,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/waiters/{filter: String}) => { let filter = filter.replace('.', "/"); @@ -90,8 +86,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/readyqueue) => { let event = Event::Metrics(MetricsRequest::ReadyQueue); @@ -100,8 +95,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (POST) (/logs) => { info!("Reloading tracing filter"); diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 76f91079e..15c8980cd 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -4,7 +4,7 @@ use crate::protocol::{ use crate::router::Ack; use crate::router::{ iobufs::{Incoming, Outgoing}, - Connection, Event, MetricsReply, Notification, ShadowRequest, + Connection, Event, Notification, ShadowRequest, }; use crate::ConnectionId; use bytes::Bytes; @@ -15,7 +15,7 @@ use parking_lot::{Mutex, RawMutex}; use std::collections::VecDeque; use std::mem; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; #[derive(Debug, thiserror::Error)] pub enum LinkError { @@ -50,9 +50,8 @@ impl Link { Arc>>, Arc>>, Receiver<()>, - Receiver, ) { - let (connection, metrics_rx) = Connection::new( + let connection = Connection::new( tenant_id, client_id.to_owned(), clean, @@ -70,13 +69,7 @@ impl Link { outgoing, }; - ( - event, - incoming_data_buffer, - outgoing_data_buffer, - link_rx, - metrics_rx, - ) + (event, incoming_data_buffer, outgoing_data_buffer, link_rx) } #[allow(clippy::new_ret_no_self)] @@ -91,7 +84,7 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = + let (message, i, o, link_rx) = Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send((0, message))?; @@ -106,7 +99,7 @@ impl Link { }; let tx = LinkTx::new(id, router_tx.clone(), i); - let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o); + let rx = LinkRx::new(id, router_tx, link_rx, o); Ok((tx, rx, notification)) } @@ -121,7 +114,7 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = + let (message, i, o, link_rx) = Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send_async((0, message)).await?; @@ -135,7 +128,7 @@ impl Link { }; let tx = LinkTx::new(id, router_tx.clone(), i); - let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o); + let rx = LinkRx::new(id, router_tx, link_rx, o); Ok((tx, rx, ack)) } } @@ -284,7 +277,6 @@ pub struct LinkRx { connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, router_rx: Receiver<()>, - metrics_rx: Receiver, send_buffer: Arc>>, cache: VecDeque, } @@ -294,14 +286,12 @@ impl LinkRx { connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, router_rx: Receiver<()>, - metrics_rx: Receiver, outgoing_data_buffer: Arc>>, ) -> LinkRx { LinkRx { connection_id, router_tx, router_rx, - metrics_rx, send_buffer: outgoing_data_buffer, cache: VecDeque::with_capacity(100), } @@ -376,12 +366,6 @@ impl LinkRx { .await?; Ok(()) } - - pub fn metrics(&self) -> Option { - self.metrics_rx - .recv_deadline(Instant::now() + Duration::from_secs(1)) - .ok() - } } #[cfg(test)] diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index 3da0fa0a5..cfc8b540b 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -59,7 +59,6 @@ fn main() { .unwrap(); let mut config: Config = config.try_deserialize().unwrap(); - config.console.set_filter_reload_handle(reload_handle); // println!("{:#?}", config); diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index fe5778d64..419c03a84 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -1,9 +1,8 @@ use crate::protocol::LastWill; use crate::Filter; -use flume::{bounded, Receiver, Sender}; use std::collections::HashSet; -use super::{ConnectionEvents, MetricsReply}; +use super::ConnectionEvents; /// Used to register a new connection with the router /// Connection messages encompasses a handle for router to @@ -19,11 +18,10 @@ pub struct Connection { pub clean: bool, /// Subscriptions pub subscriptions: HashSet, - /// Handle to send metrics reply - pub metrics: Sender, + /// Last will of this connection + pub last_will: Option, /// Connection events pub events: ConnectionEvents, - pub last_will: Option, } impl Connection { @@ -34,9 +32,7 @@ impl Connection { clean: bool, last_will: Option, dynamic_filters: bool, - ) -> (Connection, Receiver) { - let (metrics_tx, metrics_rx) = bounded(1); - + ) -> Connection { // Change client id to -> tenant_id.client_id and derive topic path prefix // to validate topics let (client_id, tenant_prefix) = match tenant_id { @@ -48,17 +44,14 @@ impl Connection { None => (client_id, None), }; - let connection = Connection { + Connection { client_id, tenant_prefix, dynamic_filters, clean, subscriptions: HashSet::default(), - metrics: metrics_tx, - events: ConnectionEvents::default(), last_will, - }; - - (connection, metrics_rx) + events: ConnectionEvents::default(), + } } } diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 74ee29f9b..fdc0ac4a3 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -222,7 +222,7 @@ impl Router { Event::Shadow(request) => { retrieve_shadow(&mut self.datalog, &mut self.obufs[id], request) } - Event::Metrics(metrics) => retrieve_metrics(id, self, metrics), + Event::Metrics(metrics) => retrieve_metrics(self, metrics), } } @@ -1106,7 +1106,7 @@ fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: Shado } } -fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsRequest) { +fn retrieve_metrics(router: &mut Router, metrics: MetricsRequest) { let message = match metrics { MetricsRequest::Config => MetricsReply::Config(router.config.clone()), MetricsRequest::Router => MetricsReply::Router(router.router_metrics.clone()), @@ -1168,8 +1168,7 @@ fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsReque } }; - let connection = router.connections.get_mut(id).unwrap(); - connection.metrics.try_send(message).ok(); + println!("{:#?}", message); } fn validate_subscription( From 7ce6671fd2059bb84d68ee722ea7bc64ef390e8c Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 18:48:09 +0530 Subject: [PATCH 21/26] Use iobuf meters instead of connection events and improve meters example --- rumqttd/examples/meters.rs | 29 +++++++++++++++++++++++---- rumqttd/src/router/mod.rs | 9 ++++----- rumqttd/src/router/routing.rs | 37 ++++++++++++++++------------------- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/rumqttd/examples/meters.rs b/rumqttd/examples/meters.rs index feda8d1aa..890464ce8 100644 --- a/rumqttd/examples/meters.rs +++ b/rumqttd/examples/meters.rs @@ -45,14 +45,14 @@ fn main() { } }); - for i in 0..10 { + for i in 0..5 { let client_id = format!("client_{i}"); let topic = format!("hello/{}/world", client_id); let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made"); thread::spawn(move || { - for _ in 0..10 { + for _ in 0..100 { thread::sleep(Duration::from_secs(1)); link_tx.publish(topic.clone(), payload.clone()).unwrap(); } @@ -60,9 +60,30 @@ fn main() { } loop { + // Router meters + let request = GetMeter::Router; + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + + // Publisher meters + for i in 0..5 { + let client_id = format!("client_{i}"); + let request = GetMeter::Connection(client_id); + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + } + + // Commitlog meters let request = GetMeter::Subscription("hello/+/world".to_owned()); let v = meters.get(request).unwrap(); - println!("{:?}", v); - thread::sleep(Duration::from_secs(1)); + println!("{:#?}", v); + + // Consumer meters + let client_id = format!("consumer"); + let request = GetMeter::Connection(client_id); + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + + thread::sleep(Duration::from_secs(5)); } } diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 581d37448..16e591ce8 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -236,15 +236,14 @@ pub struct SubscriptionMeter { pub read_offset: usize, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct IncomingMeter { pub publish_count: usize, pub subscribe_count: usize, pub total_size: usize, } -#[derive(Debug, Default)] -#[allow(dead_code)] +#[derive(Debug, Default, Clone)] pub struct OutgoingMeter { pub publish_count: usize, pub total_size: usize, @@ -265,8 +264,8 @@ pub enum GetMeter { #[derive(Debug, Clone)] pub enum Meter { Router(usize, RouterMeter), - Connection(String, ConnectionEvents), - Subscription(String, SubscriptionMeter), + Connection(String, Option, Option), + Subscription(String, Option), } #[derive(Debug, Clone)] diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index fdc0ac4a3..fea112bf2 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -84,7 +84,7 @@ pub struct Router { /// with this router router_tx: Sender<(ConnectionId, Event)>, /// Router metrics - router_metrics: RouterMeter, + router_meters: RouterMeter, /// Buffer for cache exchange of incoming packets cache: Option>, } @@ -121,7 +121,7 @@ impl Router { notifications: VecDeque::with_capacity(1024), router_rx, router_tx, - router_metrics, + router_meters: router_metrics, cache: Some(VecDeque::with_capacity(MAX_CHANNEL_CAPACITY)), } } @@ -303,7 +303,7 @@ impl Router { let tx = &self.meters[meter_id]; let _ = tx.try_send(( meter_id, - Meter::Router(self.id, self.router_metrics.clone()), + Meter::Router(self.id, self.router_meters.clone()), )); } @@ -455,7 +455,7 @@ impl Router { } }; - self.router_metrics.total_publishes += 1; + self.router_meters.total_publishes += 1; // Try to append publish to commitlog match append_to_commitlog( @@ -476,7 +476,7 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; disconnect = true; break; } @@ -642,7 +642,7 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; disconnect = true; break; } @@ -846,7 +846,7 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; // Removed disconnect = true from here because we disconnect anyways } }; @@ -856,25 +856,22 @@ impl Router { let meter_tx = &self.meters[meter_id]; match meter { GetMeter::Router => { - let _ = meter_tx.try_send(( - meter_id, - Meter::Router(self.id, self.router_metrics.clone()), - )); + let router_meters = Meter::Router(self.id, self.router_meters.clone()); + let _ = meter_tx.try_send((meter_id, router_meters)); } GetMeter::Connection(client_id) => { let connection_id = self.connection_map.get(&client_id).unwrap(); // Update metrics - if let Some(meter) = self.connections.get(*connection_id).map(|v| &v.events) { - let meter = Meter::Connection(client_id, meter.clone()); - let _ = meter_tx.try_send((meter_id, meter)); - } + let incoming_meter = self.ibufs.get(*connection_id).map(|v| v.meter.clone()); + let outgoing_meter = self.obufs.get(*connection_id).map(|v| v.meter.clone()); + let meter = Meter::Connection(client_id, incoming_meter, outgoing_meter); + let _ = meter_tx.try_send((meter_id, meter)); } GetMeter::Subscription(filter) => { - if let Some(meter) = self.datalog.meter(&filter) { - let meter = Meter::Subscription(filter.clone(), meter.clone()); - let _ = meter_tx.try_send((meter_id, meter)); - } + let subscription_meter = self.datalog.meter(&filter); + let meter = Meter::Subscription(filter.clone(),subscription_meter); + let _ = meter_tx.try_send((meter_id, meter)); } }; } @@ -1109,7 +1106,7 @@ fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: Shado fn retrieve_metrics(router: &mut Router, metrics: MetricsRequest) { let message = match metrics { MetricsRequest::Config => MetricsReply::Config(router.config.clone()), - MetricsRequest::Router => MetricsReply::Router(router.router_metrics.clone()), + MetricsRequest::Router => MetricsReply::Router(router.router_meters.clone()), MetricsRequest::Connection(id) => { let metrics = router.connection_map.get(&id).map(|v| { let c = router From cd96fb309970d62f2013348d986fead4c7156ac1 Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 18:55:21 +0530 Subject: [PATCH 22/26] Remove unwrap while fetching connectiin meter --- rumqttd/src/router/routing.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index fea112bf2..655b7d2bd 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -301,10 +301,7 @@ impl Router { fn handle_new_meter(&mut self, tx: Sender<(ConnectionId, Meter)>) { let meter_id = self.meters.insert(tx); let tx = &self.meters[meter_id]; - let _ = tx.try_send(( - meter_id, - Meter::Router(self.id, self.router_meters.clone()), - )); + let _ = tx.try_send((meter_id, Meter::Router(self.id, self.router_meters.clone()))); } fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { @@ -856,11 +853,18 @@ impl Router { let meter_tx = &self.meters[meter_id]; match meter { GetMeter::Router => { - let router_meters = Meter::Router(self.id, self.router_meters.clone()); + let router_meters = Meter::Router(self.id, self.router_meters.clone()); let _ = meter_tx.try_send((meter_id, router_meters)); } GetMeter::Connection(client_id) => { - let connection_id = self.connection_map.get(&client_id).unwrap(); + let connection_id = match self.connection_map.get(&client_id) { + Some(val) => val, + None => { + let meter = Meter::Connection("".to_owned(), None, None); + let _ = meter_tx.try_send((meter_id, meter)); + return; + } + }; // Update metrics let incoming_meter = self.ibufs.get(*connection_id).map(|v| v.meter.clone()); @@ -870,7 +874,7 @@ impl Router { } GetMeter::Subscription(filter) => { let subscription_meter = self.datalog.meter(&filter); - let meter = Meter::Subscription(filter.clone(),subscription_meter); + let meter = Meter::Subscription(filter.clone(), subscription_meter); let _ = meter_tx.try_send((meter_id, meter)); } }; From 3c986d8d5c06361b5c8244ce0924fdea6b17759b Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 20:17:19 +0530 Subject: [PATCH 23/26] Fix rustc and clippy warnings to make CI happy --- rumqttd/src/link/console.rs | 4 ++-- rumqttd/src/router/routing.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index 5ac277ecd..c2cbaa667 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -11,7 +11,7 @@ pub struct ConsoleLink { config: ConsoleSettings, connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, - link_rx: LinkRx, + _link_rx: LinkRx, } impl ConsoleLink { @@ -23,7 +23,7 @@ impl ConsoleLink { ConsoleLink { config, router_tx, - link_rx, + _link_rx: link_rx, connection_id, } } diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 655b7d2bd..3061d6b18 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -874,7 +874,7 @@ impl Router { } GetMeter::Subscription(filter) => { let subscription_meter = self.datalog.meter(&filter); - let meter = Meter::Subscription(filter.clone(), subscription_meter); + let meter = Meter::Subscription(filter, subscription_meter); let _ = meter_tx.try_send((meter_id, meter)); } }; From aa13f2ebc0e37791116f335fbc83ea211378fa1b Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 20:27:38 +0530 Subject: [PATCH 24/26] Run clippy --all-targets and fix warnings --- rumqttd/examples/meters.rs | 3 +-- rumqttd/src/link/meters.rs | 7 +++++-- rumqttd/src/server/broker.rs | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rumqttd/examples/meters.rs b/rumqttd/examples/meters.rs index 890464ce8..b0787cd57 100644 --- a/rumqttd/examples/meters.rs +++ b/rumqttd/examples/meters.rs @@ -79,8 +79,7 @@ fn main() { println!("{:#?}", v); // Consumer meters - let client_id = format!("consumer"); - let request = GetMeter::Connection(client_id); + let request = GetMeter::Connection("consumer".to_owned()); let v = meters.get(request).unwrap(); println!("{:#?}", v); diff --git a/rumqttd/src/link/meters.rs b/rumqttd/src/link/meters.rs index 966b1140a..e19103184 100644 --- a/rumqttd/src/link/meters.rs +++ b/rumqttd/src/link/meters.rs @@ -52,13 +52,16 @@ impl MetersLink { } pub fn get(&self, meter: GetMeter) -> Result { - self.router_tx.send((self.meter_id, Event::GetMeter(meter)))?; + self.router_tx + .send((self.meter_id, Event::GetMeter(meter)))?; let (_meter_id, meter) = self.router_rx.recv()?; Ok(meter) } pub async fn fetch(&self, meter: GetMeter) -> Result { - self.router_tx.send_async((self.meter_id, Event::GetMeter(meter))).await?; + self.router_tx + .send_async((self.meter_id, Event::GetMeter(meter))) + .await?; let (_meter_id, meter) = self.router_rx.recv_async().await?; Ok(meter) } diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 530b9d016..087e497b3 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -11,7 +11,7 @@ use crate::protocol::ws::Ws; use crate::protocol::Protocol; #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] use crate::server::tls::{self, TLSAcceptor}; -use crate::{ConnectionSettings, meters}; +use crate::{meters, ConnectionSettings}; use flume::{RecvError, SendError, Sender}; use std::sync::Arc; use tracing::{error, field, info, Instrument, Span}; From 113ab55b57e0b08b5d1fcf8ad18f04814c2ccf53 Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 20:34:58 +0530 Subject: [PATCH 25/26] Update changelog --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eff3ac95..2c4585f87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,10 @@ rumqttc rumqttd ------- -- Add meters related to router, subscriptions, and connections (#505) -- Allow multi-tenancy validation for mtls clients with `Org` set in certificates +- Add meters related to router, subscriptions, and connections (#508) +- Allow multi-tenancy validation for mtls clients with `Org` set in certificates (#505) - Add `tracing` for structured, context-aware logging (#499, #503) +- Add the ablity to change log levels and filters dynamically at runtime (#499) - Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480) - Changed default segment size in demo config to 100MB (#484) - Allow subscription on topic's starting with `test` (#494) From a66d20783106bac609e617f7c5e38749923999ad Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 21 Nov 2022 20:41:22 +0530 Subject: [PATCH 26/26] Fix clippy warnings in shadow link --- rumqttd/src/link/shadow.rs | 3 ++- rumqttd/src/server/broker.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index 87569cb46..5b7a03a4f 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -1,6 +1,7 @@ use crate::link::local::{LinkError, LinkRx, LinkTx}; +use crate::local::Link; use crate::router::{Event, Notification}; -use crate::{ConnectionId, ConnectionSettings, Filter, Link}; +use crate::{ConnectionId, ConnectionSettings, Filter}; use bytes::Bytes; use flume::{RecvError, SendError, Sender, TrySendError}; use futures_util::{SinkExt, StreamExt}; diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 087e497b3..91668753f 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -402,7 +402,7 @@ async fn shadow_connection( let connection_id = link.connection_id; Span::current().record("client_id", &client_id); - Span::current().record("connection_id", &connection_id); + Span::current().record("connection_id", connection_id); match link.start().await { // Connection get close. This shouldn't usually happen