diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eff3ac95..2c4585f87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,10 @@ rumqttc rumqttd ------- -- Add meters related to router, subscriptions, and connections (#505) -- Allow multi-tenancy validation for mtls clients with `Org` set in certificates +- Add meters related to router, subscriptions, and connections (#508) +- Allow multi-tenancy validation for mtls clients with `Org` set in certificates (#505) - Add `tracing` for structured, context-aware logging (#499, #503) +- Add the ablity to change log levels and filters dynamically at runtime (#499) - Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480) - Changed default segment size in demo config to 100MB (#484) - Allow subscription on topic's starting with `test` (#494) diff --git a/rumqttd/examples/meters.rs b/rumqttd/examples/meters.rs new file mode 100644 index 000000000..b0787cd57 --- /dev/null +++ b/rumqttd/examples/meters.rs @@ -0,0 +1,88 @@ +use rumqttd::{Broker, Config, GetMeter, Notification}; +use std::{thread, time::Duration}; + +fn main() { + pretty_env_logger::init(); + + // As examples are compiled as seperate binary so this config is current path dependent. Run it + // from root of this crate + let config = config::Config::builder() + .add_source(config::File::with_name("demo.toml")) + .build() + .unwrap(); + + let config: Config = config.try_deserialize().unwrap(); + + dbg!(&config); + + let broker = Broker::new(config); + let meters = broker.meters().unwrap(); + + let (mut link_tx, mut link_rx) = broker.link("consumer").unwrap(); + link_tx.subscribe("hello/+/world").unwrap(); + thread::spawn(move || { + let mut count = 0; + loop { + let notification = match link_rx.recv().unwrap() { + Some(v) => v, + None => continue, + }; + + match notification { + Notification::Forward(forward) => { + count += 1; + println!( + "Topic = {:?}, Count = {}, Payload = {} bytes", + forward.publish.topic, + count, + forward.publish.payload.len() + ); + } + v => { + println!("{:?}", v); + } + } + } + }); + + for i in 0..5 { + let client_id = format!("client_{i}"); + let topic = format!("hello/{}/world", client_id); + let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB + let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made"); + + thread::spawn(move || { + for _ in 0..100 { + thread::sleep(Duration::from_secs(1)); + link_tx.publish(topic.clone(), payload.clone()).unwrap(); + } + }); + } + + loop { + // Router meters + let request = GetMeter::Router; + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + + // Publisher meters + for i in 0..5 { + let client_id = format!("client_{i}"); + let request = GetMeter::Connection(client_id); + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + } + + // Commitlog meters + let request = GetMeter::Subscription("hello/+/world".to_owned()); + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + + // Consumer meters + let request = GetMeter::Connection("consumer".to_owned()); + let v = meters.get(request).unwrap(); + println!("{:#?}", v); + + thread::sleep(Duration::from_secs(5)); + } +} diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index a1676174e..31412db62 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -31,7 +31,10 @@ pub type TopicId = usize; pub type Offset = (u64, u64); pub type Cursor = (u64, u64); -pub use link::local::{Link, LinkError, LinkRx, LinkTx}; +pub use link::local; +pub use link::meters; + +pub use router::GetMeter; pub use router::Notification; pub use server::Broker; diff --git a/rumqttd/src/link/console.rs b/rumqttd/src/link/console.rs index f81d9c88f..c2cbaa667 100644 --- a/rumqttd/src/link/console.rs +++ b/rumqttd/src/link/console.rs @@ -11,7 +11,7 @@ pub struct ConsoleLink { config: ConsoleSettings, connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, - link_rx: LinkRx, + _link_rx: LinkRx, } impl ConsoleLink { @@ -23,7 +23,7 @@ impl ConsoleLink { ConsoleLink { config, router_tx, - link_rx, + _link_rx: link_rx, connection_id, } } @@ -48,8 +48,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/device/{id: String}) => { let event = Event::Metrics(MetricsRequest::Connection(id)); @@ -58,8 +57,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/subscriptions) => { let event = Event::Metrics(MetricsRequest::Subscriptions); @@ -68,8 +66,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/subscription/{filter: String}) => { let filter = filter.replace('.', "/"); @@ -79,8 +76,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/waiters/{filter: String}) => { let filter = filter.replace('.', "/"); @@ -90,8 +86,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (GET) (/readyqueue) => { let event = Event::Metrics(MetricsRequest::ReadyQueue); @@ -100,8 +95,7 @@ pub fn start(console: Arc) { return rouille::Response::empty_404() } - let v = console.link_rx.metrics(); - rouille::Response::json(&v) + rouille::Response::text("OK").with_status_code(200) }, (POST) (/logs) => { info!("Reloading tracing filter"); diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 76f91079e..15c8980cd 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -4,7 +4,7 @@ use crate::protocol::{ use crate::router::Ack; use crate::router::{ iobufs::{Incoming, Outgoing}, - Connection, Event, MetricsReply, Notification, ShadowRequest, + Connection, Event, Notification, ShadowRequest, }; use crate::ConnectionId; use bytes::Bytes; @@ -15,7 +15,7 @@ use parking_lot::{Mutex, RawMutex}; use std::collections::VecDeque; use std::mem; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; #[derive(Debug, thiserror::Error)] pub enum LinkError { @@ -50,9 +50,8 @@ impl Link { Arc>>, Arc>>, Receiver<()>, - Receiver, ) { - let (connection, metrics_rx) = Connection::new( + let connection = Connection::new( tenant_id, client_id.to_owned(), clean, @@ -70,13 +69,7 @@ impl Link { outgoing, }; - ( - event, - incoming_data_buffer, - outgoing_data_buffer, - link_rx, - metrics_rx, - ) + (event, incoming_data_buffer, outgoing_data_buffer, link_rx) } #[allow(clippy::new_ret_no_self)] @@ -91,7 +84,7 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = + let (message, i, o, link_rx) = Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send((0, message))?; @@ -106,7 +99,7 @@ impl Link { }; let tx = LinkTx::new(id, router_tx.clone(), i); - let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o); + let rx = LinkRx::new(id, router_tx, link_rx, o); Ok((tx, rx, notification)) } @@ -121,7 +114,7 @@ impl Link { // Connect to router // Local connections to the router shall have access to all subscriptions - let (message, i, o, link_rx, metrics_rx) = + let (message, i, o, link_rx) = Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters); router_tx.send_async((0, message)).await?; @@ -135,7 +128,7 @@ impl Link { }; let tx = LinkTx::new(id, router_tx.clone(), i); - let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o); + let rx = LinkRx::new(id, router_tx, link_rx, o); Ok((tx, rx, ack)) } } @@ -284,7 +277,6 @@ pub struct LinkRx { connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, router_rx: Receiver<()>, - metrics_rx: Receiver, send_buffer: Arc>>, cache: VecDeque, } @@ -294,14 +286,12 @@ impl LinkRx { connection_id: ConnectionId, router_tx: Sender<(ConnectionId, Event)>, router_rx: Receiver<()>, - metrics_rx: Receiver, outgoing_data_buffer: Arc>>, ) -> LinkRx { LinkRx { connection_id, router_tx, router_rx, - metrics_rx, send_buffer: outgoing_data_buffer, cache: VecDeque::with_capacity(100), } @@ -376,12 +366,6 @@ impl LinkRx { .await?; Ok(()) } - - pub fn metrics(&self) -> Option { - self.metrics_rx - .recv_deadline(Instant::now() + Duration::from_secs(1)) - .ok() - } } #[cfg(test)] diff --git a/rumqttd/src/link/meters.rs b/rumqttd/src/link/meters.rs new file mode 100644 index 000000000..e19103184 --- /dev/null +++ b/rumqttd/src/link/meters.rs @@ -0,0 +1,68 @@ +use crate::router::{Event, GetMeter, Meter}; +use crate::ConnectionId; +use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TrySendError}; + +#[derive(Debug, thiserror::Error)] +pub enum LinkError { + #[error("Channel try send error")] + TrySend(#[from] TrySendError<(ConnectionId, Event)>), + #[error("Channel send error")] + Send(#[from] SendError<(ConnectionId, Event)>), + #[error("Channel recv error")] + Recv(#[from] RecvError), + #[error("Channel timeout recv error")] + RecvTimeout(#[from] RecvTimeoutError), + #[error("Timeout = {0}")] + Elapsed(#[from] tokio::time::error::Elapsed), +} + +pub struct MetersLink { + pub(crate) meter_id: ConnectionId, + router_tx: Sender<(ConnectionId, Event)>, + router_rx: Receiver<(ConnectionId, Meter)>, +} + +impl MetersLink { + pub fn new(router_tx: Sender<(ConnectionId, Event)>) -> Result { + let (tx, rx) = flume::bounded(5); + router_tx.send((0, Event::NewMeter(tx)))?; + let (meter_id, _meter) = rx.recv()?; + + let link = MetersLink { + meter_id, + router_tx, + router_rx: rx, + }; + + Ok(link) + } + + pub async fn init(router_tx: Sender<(ConnectionId, Event)>) -> Result { + let (tx, rx) = flume::bounded(5); + router_tx.send((0, Event::NewMeter(tx)))?; + let (meter_id, _meter) = rx.recv_async().await?; + + let link = MetersLink { + meter_id, + router_tx, + router_rx: rx, + }; + + Ok(link) + } + + pub fn get(&self, meter: GetMeter) -> Result { + self.router_tx + .send((self.meter_id, Event::GetMeter(meter)))?; + let (_meter_id, meter) = self.router_rx.recv()?; + Ok(meter) + } + + pub async fn fetch(&self, meter: GetMeter) -> Result { + self.router_tx + .send_async((self.meter_id, Event::GetMeter(meter))) + .await?; + let (_meter_id, meter) = self.router_rx.recv_async().await?; + Ok(meter) + } +} diff --git a/rumqttd/src/link/mod.rs b/rumqttd/src/link/mod.rs index 6b03476a3..2a14408a8 100644 --- a/rumqttd/src/link/mod.rs +++ b/rumqttd/src/link/mod.rs @@ -1,6 +1,7 @@ // pub mod bridge; pub mod console; pub mod local; +pub mod meters; pub mod network; pub mod remote; #[cfg(feature = "websockets")] diff --git a/rumqttd/src/link/remote.rs b/rumqttd/src/link/remote.rs index bb1632a05..dcfc49540 100644 --- a/rumqttd/src/link/remote.rs +++ b/rumqttd/src/link/remote.rs @@ -1,9 +1,9 @@ -use crate::link::local::{LinkError, LinkRx, LinkTx}; +use crate::link::local::{Link, LinkError, LinkRx, LinkTx}; use crate::link::network; use crate::link::network::Network; use crate::protocol::{Connect, Packet, Protocol}; use crate::router::{Event, Notification}; -use crate::{ConnectionId, ConnectionSettings, Link}; +use crate::{ConnectionId, ConnectionSettings}; use flume::{RecvError, SendError, Sender, TrySendError}; use std::collections::VecDeque; diff --git a/rumqttd/src/link/shadow.rs b/rumqttd/src/link/shadow.rs index 87569cb46..5b7a03a4f 100644 --- a/rumqttd/src/link/shadow.rs +++ b/rumqttd/src/link/shadow.rs @@ -1,6 +1,7 @@ use crate::link::local::{LinkError, LinkRx, LinkTx}; +use crate::local::Link; use crate::router::{Event, Notification}; -use crate::{ConnectionId, ConnectionSettings, Filter, Link}; +use crate::{ConnectionId, ConnectionSettings, Filter}; use bytes::Bytes; use flume::{RecvError, SendError, Sender, TrySendError}; use futures_util::{SinkExt, StreamExt}; diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index 3da0fa0a5..cfc8b540b 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -59,7 +59,6 @@ fn main() { .unwrap(); let mut config: Config = config.try_deserialize().unwrap(); - config.console.set_filter_reload_handle(reload_handle); // println!("{:#?}", config); diff --git a/rumqttd/src/router/connection.rs b/rumqttd/src/router/connection.rs index a6387c338..419c03a84 100644 --- a/rumqttd/src/router/connection.rs +++ b/rumqttd/src/router/connection.rs @@ -1,9 +1,8 @@ use crate::protocol::LastWill; use crate::Filter; -use flume::{bounded, Receiver, Sender}; use std::collections::HashSet; -use super::{ConnectionMeter, MetricsReply}; +use super::ConnectionEvents; /// Used to register a new connection with the router /// Connection messages encompasses a handle for router to @@ -19,11 +18,10 @@ pub struct Connection { pub clean: bool, /// Subscriptions pub subscriptions: HashSet, - /// Handle to send metrics reply - pub metrics: Sender, - /// Connection metrics - pub meter: ConnectionMeter, + /// Last will of this connection pub last_will: Option, + /// Connection events + pub events: ConnectionEvents, } impl Connection { @@ -34,9 +32,7 @@ impl Connection { clean: bool, last_will: Option, dynamic_filters: bool, - ) -> (Connection, Receiver) { - let (metrics_tx, metrics_rx) = bounded(1); - + ) -> Connection { // Change client id to -> tenant_id.client_id and derive topic path prefix // to validate topics let (client_id, tenant_prefix) = match tenant_id { @@ -48,17 +44,14 @@ impl Connection { None => (client_id, None), }; - let connection = Connection { + Connection { client_id, tenant_prefix, dynamic_filters, clean, subscriptions: HashSet::default(), - metrics: metrics_tx, - meter: ConnectionMeter::default(), last_will, - }; - - (connection, metrics_rx) + events: ConnectionEvents::default(), + } } } diff --git a/rumqttd/src/router/graveyard.rs b/rumqttd/src/router/graveyard.rs index 485d38c13..96c6e4b98 100644 --- a/rumqttd/src/router/graveyard.rs +++ b/rumqttd/src/router/graveyard.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use super::{ scheduler::{PauseReason, Tracker}, - ConnectionMeter, + ConnectionEvents, }; pub struct Graveyard { @@ -27,7 +27,7 @@ impl Graveyard { &mut self, mut tracker: Tracker, subscriptions: HashSet, - metrics: ConnectionMeter, + metrics: ConnectionEvents, ) { tracker.pause(PauseReason::Busy); let id = tracker.id.clone(); @@ -47,7 +47,7 @@ impl Graveyard { pub struct SavedState { pub tracker: Tracker, pub subscriptions: HashSet, - pub metrics: ConnectionMeter, + pub metrics: ConnectionEvents, } impl SavedState { @@ -55,7 +55,7 @@ impl SavedState { SavedState { tracker: Tracker::new(client_id), subscriptions: HashSet::new(), - metrics: ConnectionMeter::default(), + metrics: ConnectionEvents::default(), } } } diff --git a/rumqttd/src/router/iobufs.rs b/rumqttd/src/router/iobufs.rs index de186070c..b3b90b4d2 100644 --- a/rumqttd/src/router/iobufs.rs +++ b/rumqttd/src/router/iobufs.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, sync::Arc, time::Instant}; +use std::{collections::VecDeque, sync::Arc}; use flume::{Receiver, Sender}; use parking_lot::Mutex; @@ -10,7 +10,7 @@ use crate::{ Cursor, Notification, }; -use super::Forward; +use super::{Forward, IncomingMeter, OutgoingMeter}; const MAX_INFLIGHT: usize = 100; const MAX_PKID: u16 = MAX_INFLIGHT as u16; @@ -173,71 +173,6 @@ impl Outgoing { } } -#[derive(Debug)] -#[allow(dead_code)] -pub struct IncomingMeter { - pub(crate) publish_count: usize, - pub(crate) subscribe_count: usize, - pub(crate) total_size: usize, - pub(crate) last_timestamp: Instant, - start: Instant, - pub(crate) data_rate: usize, -} - -impl Default for IncomingMeter { - fn default() -> Self { - Self { - publish_count: 0, - subscribe_count: 0, - total_size: 0, - last_timestamp: Instant::now(), - start: Instant::now(), - data_rate: 0, - } - } -} - -impl IncomingMeter { - //TODO: Fix this - #[allow(dead_code)] - pub(crate) fn average_last_data_rate(&mut self) { - let now = Instant::now(); - self.data_rate = self.total_size / now.duration_since(self.start).as_micros() as usize; - self.last_timestamp = now; - } -} - -#[derive(Debug)] -#[allow(dead_code)] -pub struct OutgoingMeter { - pub(crate) publish_count: usize, - pub(crate) total_size: usize, - pub(crate) last_timestamp: Instant, - start: Instant, - pub(crate) data_rate: usize, -} - -impl Default for OutgoingMeter { - fn default() -> Self { - Self { - publish_count: 0, - total_size: 0, - last_timestamp: Instant::now(), - start: Instant::now(), - data_rate: 0, - } - } -} - -impl OutgoingMeter { - #[allow(dead_code)] - pub(crate) fn average_last_data_rate(&mut self) { - let now = Instant::now(); - self.data_rate = self.total_size / now.duration_since(self.start).as_micros() as usize; - self.last_timestamp = now; - } -} - #[cfg(test)] mod test { // use super::{Outgoing, MAX_INFLIGHT}; diff --git a/rumqttd/src/router/mod.rs b/rumqttd/src/router/mod.rs index 2fdc3a248..16e591ce8 100644 --- a/rumqttd/src/router/mod.rs +++ b/rumqttd/src/router/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, fmt, }; @@ -43,6 +43,10 @@ pub enum Event { incoming: iobufs::Incoming, outgoing: iobufs::Outgoing, }, + /// New meter link + NewMeter(flume::Sender<(ConnectionId, Meter)>), + /// Request for meter + GetMeter(GetMeter), /// Connection ready to receive more data Ready, /// Data for native commitlog @@ -215,7 +219,7 @@ pub struct ShadowReply { } #[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct RouterMetrics { +pub struct RouterMeter { pub router_id: RouterId, pub total_connections: usize, pub total_subscriptions: usize, @@ -232,41 +236,36 @@ pub struct SubscriptionMeter { pub read_offset: usize, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct ConnectionMeter { - publish_count: usize, - publish_size: usize, - subscriptions: HashSet, - events: VecDeque, +#[derive(Debug, Default, Clone)] +pub struct IncomingMeter { + pub publish_count: usize, + pub subscribe_count: usize, + pub total_size: usize, } -impl ConnectionMeter { - pub fn increment_publish_count(&mut self) { - self.publish_count += 1 - } - - pub fn add_publish_size(&mut self, size: usize) { - self.publish_size += size; - } - - pub fn push_subscription(&mut self, filter: Filter) { - self.subscriptions.insert(filter); - } +#[derive(Debug, Default, Clone)] +pub struct OutgoingMeter { + pub publish_count: usize, + pub total_size: usize, +} - pub fn push_subscriptions(&mut self, filters: HashSet) { - self.subscriptions.extend(filters); - } +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ConnectionEvents { + events: VecDeque, +} - pub fn push_event(&mut self, event: String) { - self.events.push_back(event); - if self.events.len() > 10 { - self.events.pop_front(); - } - } +#[derive(Debug, Clone)] +pub enum GetMeter { + Router, + Connection(String), + Subscription(String), +} - pub fn remove_subscription(&mut self, filter: Filter) { - self.subscriptions.remove(&filter); - } +#[derive(Debug, Clone)] +pub enum Meter { + Router(usize, RouterMeter), + Connection(String, Option, Option), + Subscription(String, Option), } #[derive(Debug, Clone)] @@ -284,8 +283,8 @@ pub enum MetricsRequest { #[serde(rename_all = "lowercase")] pub enum MetricsReply { Config(RouterConfig), - Router(RouterMetrics), - Connection(Option<(ConnectionMeter, Tracker)>), + Router(RouterMeter), + Connection(Option<(ConnectionEvents, Tracker)>), Subscriptions(HashMap>), Subscription(Option), Waiters(Option>), diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index 3207df688..3061d6b18 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -22,8 +22,9 @@ use super::iobufs::{Incoming, Outgoing}; use super::logs::{AckLog, DataLog}; use super::scheduler::{ScheduleReason, Scheduler}; use super::{ - packetid, Connection, DataRequest, Event, FilterIdx, MetricsReply, MetricsRequest, - Notification, RouterMetrics, ShadowRequest, MAX_CHANNEL_CAPACITY, MAX_SCHEDULE_ITERATIONS, + packetid, Connection, DataRequest, Event, FilterIdx, GetMeter, Meter, MetricsReply, + MetricsRequest, Notification, RouterMeter, ShadowRequest, MAX_CHANNEL_CAPACITY, + MAX_SCHEDULE_ITERATIONS, }; #[derive(Error, Debug)] @@ -55,6 +56,8 @@ pub struct Router { /// Saved state of dead persistent connections graveyard: Graveyard, /// List of connections + meters: Slab>, + /// List of connections connections: Slab, /// Connection map from device id to connection id connection_map: HashMap, @@ -81,7 +84,7 @@ pub struct Router { /// with this router router_tx: Sender<(ConnectionId, Event)>, /// Router metrics - router_metrics: RouterMetrics, + router_meters: RouterMeter, /// Buffer for cache exchange of incoming packets cache: Option>, } @@ -90,14 +93,15 @@ impl Router { pub fn new(router_id: RouterId, config: RouterConfig) -> Router { let (router_tx, router_rx) = bounded(1000); + let meters = Slab::with_capacity(10); let connections = Slab::with_capacity(config.max_connections); let ibufs = Slab::with_capacity(config.max_connections); let obufs = Slab::with_capacity(config.max_connections); let ackslog = Slab::with_capacity(config.max_connections); - let router_metrics = RouterMetrics { + let router_metrics = RouterMeter { router_id, - ..RouterMetrics::default() + ..RouterMeter::default() }; let max_connections = config.max_connections; @@ -105,6 +109,7 @@ impl Router { id: router_id, config: config.clone(), graveyard: Graveyard::new(), + meters, connections, connection_map: Default::default(), subscription_map: Default::default(), @@ -116,7 +121,7 @@ impl Router { notifications: VecDeque::with_capacity(1024), router_rx, router_tx, - router_metrics, + router_meters: router_metrics, cache: Some(VecDeque::with_capacity(MAX_CHANNEL_CAPACITY)), } } @@ -151,6 +156,7 @@ impl Router { /// Waits on incoming events when ready queue is empty. /// After pulling 1 event, tries to pull 500 more events /// before polling ready queue 100 times (connections) + #[tracing::instrument(skip_all)] fn run(&mut self, count: usize) -> Result<(), RouterError> { match count { 0 => loop { @@ -208,13 +214,15 @@ impl Router { incoming, outgoing, } => self.handle_new_connection(connection, incoming, outgoing), + Event::NewMeter(tx) => self.handle_new_meter(tx), + Event::GetMeter(meter) => self.handle_get_meter(id, meter), Event::DeviceData => self.handle_device_payload(id), Event::Disconnect(disconnect) => self.handle_disconnection(id, disconnect.execute_will), Event::Ready => self.scheduler.reschedule(id, ScheduleReason::Ready), Event::Shadow(request) => { retrieve_shadow(&mut self.datalog, &mut self.obufs[id], request) } - Event::Metrics(metrics) => retrieve_metrics(id, self, metrics), + Event::Metrics(metrics) => retrieve_metrics(self, metrics), } } @@ -243,13 +251,12 @@ impl Router { let tracker = if !clean_session { let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s); connection.subscriptions = saved.subscriptions; - connection.meter = saved.metrics; + connection.events = saved.metrics; saved.tracker } else { // Only retrieve metrics in clean session let saved = saved.map_or(SavedState::new(client_id.clone()), |s| s); - connection.meter = saved.metrics; - connection.meter.subscriptions.clear(); + connection.events = saved.metrics; Tracker::new(client_id.clone()) }; let ackslog = AckLog::new(); @@ -260,10 +267,11 @@ impl Router { }; let event = "connection at ".to_owned() + &time + ", clean = " + &clean_session.to_string(); - connection.meter.push_event(event); - connection - .meter - .push_subscriptions(connection.subscriptions.clone()); + connection.events.events.push_back(event); + + if connection.events.events.len() > 10 { + connection.events.events.pop_front(); + } let connection_id = self.connections.insert(connection); assert_eq!(self.ibufs.insert(incoming), connection_id); @@ -290,6 +298,12 @@ impl Router { .reschedule(connection_id, ScheduleReason::Init); } + fn handle_new_meter(&mut self, tx: Sender<(ConnectionId, Meter)>) { + let meter_id = self.meters.insert(tx); + let tx = &self.meters[meter_id]; + let _ = tx.try_send((meter_id, Meter::Router(self.id, self.router_meters.clone()))); + } + fn handle_disconnection(&mut self, id: ConnectionId, execute_last_will: bool) { // Some clients can choose to send Disconnect packet before network disconnection. // This will lead to double Disconnect packets in router `events` @@ -339,7 +353,11 @@ impl Router { }; let event = "disconnection at ".to_owned() + &time; - connection.meter.push_event(event); + connection.events.events.push_back(event); + + if connection.events.events.len() > 10 { + connection.events.events.pop_front(); + } // Save state for persistent sessions if !connection.clean { @@ -349,12 +367,11 @@ impl Router { .for_each(|r| tracker.register_data_request(r)); self.graveyard - .save(tracker, connection.subscriptions, connection.meter); + .save(tracker, connection.subscriptions, connection.events); } else { // Only save metrics in clean session - connection.meter.subscriptions.clear(); self.graveyard - .save(Tracker::new(client_id), HashSet::new(), connection.meter); + .save(Tracker::new(client_id), HashSet::new(), connection.events); } } @@ -435,7 +452,7 @@ impl Router { } }; - self.router_metrics.total_publishes += 1; + self.router_meters.total_publishes += 1; // Try to append publish to commitlog match append_to_commitlog( @@ -456,18 +473,12 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; disconnect = true; break; } }; - // Update metrics - if let Some(metrics) = self.connections.get_mut(id).map(|v| &mut v.meter) { - metrics.increment_publish_count(); - metrics.add_publish_size(size); - } - let meter = &mut self.ibufs.get_mut(id).unwrap().meter; meter.publish_count += 1; meter.total_size += size; @@ -496,9 +507,6 @@ impl Router { let filter = f.path; let qos = f.qos; - // Update metrics - connection.meter.push_subscription(filter.clone()); - let (idx, cursor) = self.datalog.next_native_offset(&filter); self.prepare_filter(id, cursor, idx, filter.clone(), qos as u8); self.datalog @@ -535,7 +543,6 @@ impl Router { continue; } - connection.meter.remove_subscription(filter.clone()); let meter = &mut self.ibufs.get_mut(id).unwrap().meter; meter.subscribe_count -= 1; @@ -632,7 +639,7 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; disconnect = true; break; } @@ -836,11 +843,42 @@ impl Router { error!( reason = ?e, "Failed to append to commitlog" ); - self.router_metrics.failed_publishes += 1; + self.router_meters.failed_publishes += 1; // Removed disconnect = true from here because we disconnect anyways } }; } + + fn handle_get_meter(&self, meter_id: ConnectionId, meter: router::GetMeter) { + let meter_tx = &self.meters[meter_id]; + match meter { + GetMeter::Router => { + let router_meters = Meter::Router(self.id, self.router_meters.clone()); + let _ = meter_tx.try_send((meter_id, router_meters)); + } + GetMeter::Connection(client_id) => { + let connection_id = match self.connection_map.get(&client_id) { + Some(val) => val, + None => { + let meter = Meter::Connection("".to_owned(), None, None); + let _ = meter_tx.try_send((meter_id, meter)); + return; + } + }; + + // Update metrics + let incoming_meter = self.ibufs.get(*connection_id).map(|v| v.meter.clone()); + let outgoing_meter = self.obufs.get(*connection_id).map(|v| v.meter.clone()); + let meter = Meter::Connection(client_id, incoming_meter, outgoing_meter); + let _ = meter_tx.try_send((meter_id, meter)); + } + GetMeter::Subscription(filter) => { + let subscription_meter = self.datalog.meter(&filter); + let meter = Meter::Subscription(filter, subscription_meter); + let _ = meter_tx.try_send((meter_id, meter)); + } + }; + } } fn append_to_commitlog( @@ -1069,13 +1107,17 @@ fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: Shado } } -fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsRequest) { +fn retrieve_metrics(router: &mut Router, metrics: MetricsRequest) { let message = match metrics { MetricsRequest::Config => MetricsReply::Config(router.config.clone()), - MetricsRequest::Router => MetricsReply::Router(router.router_metrics.clone()), + MetricsRequest::Router => MetricsReply::Router(router.router_meters.clone()), MetricsRequest::Connection(id) => { let metrics = router.connection_map.get(&id).map(|v| { - let c = router.connections.get(*v).map(|v| v.meter.clone()).unwrap(); + let c = router + .connections + .get(*v) + .map(|v| v.events.clone()) + .unwrap(); let t = router.scheduler.trackers.get(*v).cloned().unwrap(); (c, t) }); @@ -1127,8 +1169,7 @@ fn retrieve_metrics(id: ConnectionId, router: &mut Router, metrics: MetricsReque } }; - let connection = router.connections.get_mut(id).unwrap(); - connection.metrics.try_send(message).ok(); + println!("{:#?}", message); } fn validate_subscription( diff --git a/rumqttd/src/server/broker.rs b/rumqttd/src/server/broker.rs index 2be046860..91668753f 100644 --- a/rumqttd/src/server/broker.rs +++ b/rumqttd/src/server/broker.rs @@ -11,7 +11,7 @@ use crate::protocol::ws::Ws; use crate::protocol::Protocol; #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] use crate::server::tls::{self, TLSAcceptor}; -use crate::ConnectionSettings; +use crate::{meters, ConnectionSettings}; use flume::{RecvError, SendError, Sender}; use std::sync::Arc; use tracing::{error, field, info, Instrument, Span}; @@ -120,6 +120,12 @@ impl Broker { // } // } + // Link to get meters + pub fn meters(&self) -> Result { + let link = meters::MetersLink::new(self.router_tx.clone())?; + Ok(link) + } + pub fn link(&self, client_id: &str) -> Result<(LinkTx, LinkRx), local::LinkError> { // Register this connection with the router. Router replies with ack which if ok will // start the link. Router can sometimes reject the connection (ex max connection limit)