Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rumqttd): Add metering #508

Merged
merged 29 commits into from Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
51fb523
feat: use tracing crate instead of log
swanandx Nov 1, 2022
39850c6
tracing subscriber in progress
swanandx Nov 2, 2022
858d960
added client_idxx as span field and use FILTER env var
swanandx Nov 2, 2022
93c9d6f
manually creat spans
swanandx Nov 3, 2022
f01dd4e
initial poc for reloading filter
swanandx Nov 4, 2022
d4460b5
feat: dynamically reloading filter for logs
swanandx Nov 6, 2022
02235a4
feat: filter the log with rumqttd as target by default
swanandx Nov 7, 2022
8c65f0a
feat: consume span and removed unneccesary fields
swanandx Nov 7, 2022
c0061cc
return text instead of json if logs filter reloaded
swanandx Nov 7, 2022
fe0171a
feat: removed count in run span and created span for server
swanandx Nov 8, 2022
521fe40
feat: tracing
swanandx Nov 8, 2022
cc108ab
feat: pretty logs
swanandx Nov 8, 2022
15905e1
feat: use instrument combinator for attacing span to futures
swanandx Nov 8, 2022
63721a4
Add meters link
tekjar Nov 11, 2022
b3fe6c4
Merge branch 'main' into metrics
tekjar Nov 12, 2022
7fdfefd
Cleanup unused Meters
tekjar Nov 12, 2022
dd9ce3e
Merge main
tekjar Nov 12, 2022
7923b79
Add meters to singlenode
tekjar Nov 17, 2022
881d613
Add meters example
tekjar Nov 20, 2022
14a7815
Revert single node example
tekjar Nov 20, 2022
265acec
Improve meters example separate out meters and connection events
tekjar Nov 20, 2022
b4ee8e0
Merge branch 'main' into metrics
Nov 21, 2022
0e938dc
Remove metrics from local links
Nov 21, 2022
7ce6671
Use iobuf meters instead of connection events and improve meters example
tekjar Nov 21, 2022
cd96fb3
Remove unwrap while fetching connectiin meter
tekjar Nov 21, 2022
3c986d8
Fix rustc and clippy warnings to make CI happy
tekjar Nov 21, 2022
aa13f2e
Run clippy --all-targets and fix warnings
tekjar Nov 21, 2022
113ab55
Update changelog
tekjar Nov 21, 2022
a66d207
Fix clippy warnings in shadow link
tekjar Nov 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Expand Up @@ -8,9 +8,10 @@ rumqttc

rumqttd
-------
- Add meters related to router, subscriptions, and connections (#505)
- Allow multi-tenancy validation for mtls clients with `Org` set in certificates
- Add meters related to router, subscriptions, and connections (#508)
- Allow multi-tenancy validation for mtls clients with `Org` set in certificates (#505)
- Add `tracing` for structured, context-aware logging (#499, #503)
- Add the ablity to change log levels and filters dynamically at runtime (#499)
- Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480)
- Changed default segment size in demo config to 100MB (#484)
- Allow subscription on topic's starting with `test` (#494)
Expand Down
88 changes: 88 additions & 0 deletions rumqttd/examples/meters.rs
@@ -0,0 +1,88 @@
use rumqttd::{Broker, Config, GetMeter, Notification};
use std::{thread, time::Duration};

fn main() {
pretty_env_logger::init();

// As examples are compiled as seperate binary so this config is current path dependent. Run it
// from root of this crate
let config = config::Config::builder()
.add_source(config::File::with_name("demo.toml"))
.build()
.unwrap();

let config: Config = config.try_deserialize().unwrap();

dbg!(&config);

let broker = Broker::new(config);
let meters = broker.meters().unwrap();

let (mut link_tx, mut link_rx) = broker.link("consumer").unwrap();
link_tx.subscribe("hello/+/world").unwrap();
thread::spawn(move || {
let mut count = 0;
loop {
let notification = match link_rx.recv().unwrap() {
Some(v) => v,
None => continue,
};

match notification {
Notification::Forward(forward) => {
count += 1;
println!(
"Topic = {:?}, Count = {}, Payload = {} bytes",
forward.publish.topic,
count,
forward.publish.payload.len()
);
}
v => {
println!("{:?}", v);
}
}
}
});

for i in 0..5 {
let client_id = format!("client_{i}");
let topic = format!("hello/{}/world", client_id);
let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB
let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made");

thread::spawn(move || {
for _ in 0..100 {
thread::sleep(Duration::from_secs(1));
link_tx.publish(topic.clone(), payload.clone()).unwrap();
}
});
}

loop {
// Router meters
let request = GetMeter::Router;
let v = meters.get(request).unwrap();
println!("{:#?}", v);

// Publisher meters
for i in 0..5 {
let client_id = format!("client_{i}");
let request = GetMeter::Connection(client_id);
let v = meters.get(request).unwrap();
println!("{:#?}", v);
}

// Commitlog meters
let request = GetMeter::Subscription("hello/+/world".to_owned());
let v = meters.get(request).unwrap();
println!("{:#?}", v);

// Consumer meters
let request = GetMeter::Connection("consumer".to_owned());
let v = meters.get(request).unwrap();
println!("{:#?}", v);

thread::sleep(Duration::from_secs(5));
}
}
5 changes: 4 additions & 1 deletion rumqttd/src/lib.rs
Expand Up @@ -31,7 +31,10 @@ pub type TopicId = usize;
pub type Offset = (u64, u64);
pub type Cursor = (u64, u64);

pub use link::local::{Link, LinkError, LinkRx, LinkTx};
pub use link::local;
pub use link::meters;

pub use router::GetMeter;
pub use router::Notification;
pub use server::Broker;

Expand Down
22 changes: 8 additions & 14 deletions rumqttd/src/link/console.rs
Expand Up @@ -11,7 +11,7 @@ pub struct ConsoleLink {
config: ConsoleSettings,
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
link_rx: LinkRx,
_link_rx: LinkRx,
}

impl ConsoleLink {
Expand All @@ -23,7 +23,7 @@ impl ConsoleLink {
ConsoleLink {
config,
router_tx,
link_rx,
_link_rx: link_rx,
connection_id,
}
}
Expand All @@ -48,8 +48,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/device/{id: String}) => {
let event = Event::Metrics(MetricsRequest::Connection(id));
Expand All @@ -58,8 +57,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/subscriptions) => {
let event = Event::Metrics(MetricsRequest::Subscriptions);
Expand All @@ -68,8 +66,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/subscription/{filter: String}) => {
let filter = filter.replace('.', "/");
Expand All @@ -79,8 +76,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/waiters/{filter: String}) => {
let filter = filter.replace('.', "/");
Expand All @@ -90,8 +86,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/readyqueue) => {
let event = Event::Metrics(MetricsRequest::ReadyQueue);
Expand All @@ -100,8 +95,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(POST) (/logs) => {
info!("Reloading tracing filter");
Expand Down
32 changes: 8 additions & 24 deletions rumqttd/src/link/local.rs
Expand Up @@ -4,7 +4,7 @@ use crate::protocol::{
use crate::router::Ack;
use crate::router::{
iobufs::{Incoming, Outgoing},
Connection, Event, MetricsReply, Notification, ShadowRequest,
Connection, Event, Notification, ShadowRequest,
};
use crate::ConnectionId;
use bytes::Bytes;
Expand All @@ -15,7 +15,7 @@ use parking_lot::{Mutex, RawMutex};
use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

#[derive(Debug, thiserror::Error)]
pub enum LinkError {
Expand Down Expand Up @@ -50,9 +50,8 @@ impl Link {
Arc<Mutex<VecDeque<Packet>>>,
Arc<Mutex<VecDeque<Notification>>>,
Receiver<()>,
Receiver<MetricsReply>,
) {
let (connection, metrics_rx) = Connection::new(
let connection = Connection::new(
tenant_id,
client_id.to_owned(),
clean,
Expand All @@ -70,13 +69,7 @@ impl Link {
outgoing,
};

(
event,
incoming_data_buffer,
outgoing_data_buffer,
link_rx,
metrics_rx,
)
(event, incoming_data_buffer, outgoing_data_buffer, link_rx)
}

#[allow(clippy::new_ret_no_self)]
Expand All @@ -91,7 +84,7 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) =
let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send((0, message))?;

Expand All @@ -106,7 +99,7 @@ impl Link {
};

let tx = LinkTx::new(id, router_tx.clone(), i);
let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o);
let rx = LinkRx::new(id, router_tx, link_rx, o);
Ok((tx, rx, notification))
}

Expand All @@ -121,7 +114,7 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) =
let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send_async((0, message)).await?;

Expand All @@ -135,7 +128,7 @@ impl Link {
};

let tx = LinkTx::new(id, router_tx.clone(), i);
let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o);
let rx = LinkRx::new(id, router_tx, link_rx, o);
Ok((tx, rx, ack))
}
}
Expand Down Expand Up @@ -284,7 +277,6 @@ pub struct LinkRx {
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<()>,
metrics_rx: Receiver<MetricsReply>,
send_buffer: Arc<Mutex<VecDeque<Notification>>>,
cache: VecDeque<Notification>,
}
Expand All @@ -294,14 +286,12 @@ impl LinkRx {
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<()>,
metrics_rx: Receiver<MetricsReply>,
outgoing_data_buffer: Arc<Mutex<VecDeque<Notification>>>,
) -> LinkRx {
LinkRx {
connection_id,
router_tx,
router_rx,
metrics_rx,
send_buffer: outgoing_data_buffer,
cache: VecDeque::with_capacity(100),
}
Expand Down Expand Up @@ -376,12 +366,6 @@ impl LinkRx {
.await?;
Ok(())
}

pub fn metrics(&self) -> Option<MetricsReply> {
self.metrics_rx
.recv_deadline(Instant::now() + Duration::from_secs(1))
.ok()
}
}

#[cfg(test)]
Expand Down
68 changes: 68 additions & 0 deletions rumqttd/src/link/meters.rs
@@ -0,0 +1,68 @@
use crate::router::{Event, GetMeter, Meter};
use crate::ConnectionId;
use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TrySendError};

#[derive(Debug, thiserror::Error)]
pub enum LinkError {
#[error("Channel try send error")]
TrySend(#[from] TrySendError<(ConnectionId, Event)>),
#[error("Channel send error")]
Send(#[from] SendError<(ConnectionId, Event)>),
#[error("Channel recv error")]
Recv(#[from] RecvError),
#[error("Channel timeout recv error")]
RecvTimeout(#[from] RecvTimeoutError),
#[error("Timeout = {0}")]
Elapsed(#[from] tokio::time::error::Elapsed),
}

pub struct MetersLink {
pub(crate) meter_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<(ConnectionId, Meter)>,
}

impl MetersLink {
pub fn new(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
let (tx, rx) = flume::bounded(5);
router_tx.send((0, Event::NewMeter(tx)))?;
let (meter_id, _meter) = rx.recv()?;

let link = MetersLink {
meter_id,
router_tx,
router_rx: rx,
};

Ok(link)
}

pub async fn init(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
let (tx, rx) = flume::bounded(5);
router_tx.send((0, Event::NewMeter(tx)))?;
let (meter_id, _meter) = rx.recv_async().await?;

let link = MetersLink {
meter_id,
router_tx,
router_rx: rx,
};

Ok(link)
}

pub fn get(&self, meter: GetMeter) -> Result<Meter, LinkError> {
self.router_tx
.send((self.meter_id, Event::GetMeter(meter)))?;
let (_meter_id, meter) = self.router_rx.recv()?;
Ok(meter)
}

pub async fn fetch(&self, meter: GetMeter) -> Result<Meter, LinkError> {
self.router_tx
.send_async((self.meter_id, Event::GetMeter(meter)))
.await?;
let (_meter_id, meter) = self.router_rx.recv_async().await?;
Ok(meter)
}
}
1 change: 1 addition & 0 deletions rumqttd/src/link/mod.rs
@@ -1,6 +1,7 @@
// pub mod bridge;
pub mod console;
pub mod local;
pub mod meters;
pub mod network;
pub mod remote;
#[cfg(feature = "websockets")]
Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/link/remote.rs
@@ -1,9 +1,9 @@
use crate::link::local::{LinkError, LinkRx, LinkTx};
use crate::link::local::{Link, LinkError, LinkRx, LinkTx};
use crate::link::network;
use crate::link::network::Network;
use crate::protocol::{Connect, Packet, Protocol};
use crate::router::{Event, Notification};
use crate::{ConnectionId, ConnectionSettings, Link};
use crate::{ConnectionId, ConnectionSettings};

use flume::{RecvError, SendError, Sender, TrySendError};
use std::collections::VecDeque;
Expand Down