From ff01077d0025c4351f67b69e274c7d0384fc4fac Mon Sep 17 00:00:00 2001 From: tekjar Date: Fri, 25 Nov 2022 20:31:49 +0530 Subject: [PATCH 1/6] Add send method to local link to publish any packet and add stress example --- rumqttd/examples/stress.rs | 88 ++++++++++++++++++++++++++++++++++++++ rumqttd/src/lib.rs | 2 +- rumqttd/src/link/local.rs | 14 ++++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 rumqttd/examples/stress.rs diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs new file mode 100644 index 000000000..4382317bb --- /dev/null +++ b/rumqttd/examples/stress.rs @@ -0,0 +1,88 @@ +use std::{ + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; + +use bytes::Bytes; +use rumqttd::{local::LinkRx, Broker, protocol::{Publish, QoS, Packet}}; +use tokio::{time::{self, Instant}, task}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + // let router = Router::new(); // Router is not publically exposed! + tracing_subscriber::fmt::init(); + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let config = config::Config::builder() + .add_source(config::File::with_name(&format!( + "{manifest_dir}/demo.toml" + ))) + .build() + .unwrap(); // Config::default() doesn't have working values + + let config = config.try_deserialize().unwrap(); + let broker = Broker::new(config); + + const CONNECTIONS: usize = 10; + const MAX_MSG_PER_PUB: usize = 5; + + let (mut link_tx, mut link_rx) = broker + .link("the_subscriber") + .expect("New link should be made"); + link_tx + .subscribe("hello/+/world") + .expect("link should subscribe"); + + link_rx.recv().expect("Should recieve Ack"); + + for i in 0..1 { + let client_id = format!("client_{i}"); + let topic = format!("hello/{}/world", client_id); + let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB + let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made"); + + let topic: Bytes = topic.into(); + let payload: Bytes = payload.into(); + task::spawn(async move { + let mut interval = time::interval(Duration::from_secs(1)); + for _ in 0..MAX_MSG_PER_PUB { + dbg!(); + interval.tick().await; + dbg!(); + + let publish = Publish { + dup: false, + qos: QoS::AtMostOnce, + retain: false, + topic: topic.clone(), + pkid: 0, + payload: payload.clone(), + }; + + dbg!("publish"); + link_tx.send(Packet::Publish(publish, None)).await.unwrap(); + } + }); + } + + let count = Arc::new(AtomicU32::new(0)); + let instant = Instant::now(); + + tokio::spawn(keep_recv(link_rx, count.clone())); + + let eta = MAX_MSG_PER_PUB + 2; // 2 sec as buffer time / delay + + let mut interval = time::interval(Duration::from_secs(1)); + for _ in 0..eta { + interval.tick().await; + println!("TOTAL COUNT: {count:?}; TIME: {:?}", instant.elapsed()); + } +} + +async fn keep_recv(mut link_rx: LinkRx, count: Arc) { + loop { + let notification = link_rx.recv().unwrap(); + if notification.is_some() { + count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + } +} diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 31412db62..4fe9a0c27 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -16,8 +16,8 @@ use tracing_subscriber::{ use std::net::SocketAddr; +pub mod protocol; mod link; -mod protocol; mod router; mod segments; mod server; diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 15c8980cd..08e61ab86 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -170,6 +170,20 @@ impl LinkTx { Ok(len) } + /// Send raw device data + pub async fn send(&mut self, data: Packet) -> Result { + let len = { + let mut buffer = self.recv_buffer.lock(); + buffer.push_back(data); + buffer.len() + }; + + self.router_tx + .send((self.connection_id, Event::DeviceData))?; + + Ok(len) + } + fn try_push(&mut self, data: Packet) -> Result { let len = { let mut buffer = self.recv_buffer.lock(); From 13c1fe924274e60db7abc7e52f461bdc0318a3d6 Mon Sep 17 00:00:00 2001 From: tekjar Date: Sat, 26 Nov 2022 14:03:01 +0530 Subject: [PATCH 2/6] Simplify stress example --- rumqttd/examples/stress.rs | 63 +++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs index 4382317bb..533d21918 100644 --- a/rumqttd/examples/stress.rs +++ b/rumqttd/examples/stress.rs @@ -1,11 +1,19 @@ -use std::{ - sync::{atomic::AtomicU32, Arc}, - time::Duration, -}; +use std::time::Duration; use bytes::Bytes; -use rumqttd::{local::LinkRx, Broker, protocol::{Publish, QoS, Packet}}; -use tokio::{time::{self, Instant}, task}; +use rumqttd::{ + local::LinkRx, + protocol::{Packet, Publish, QoS}, + Broker, +}; + +use tokio::{ + select, task, + time::{self, Instant}, +}; + +const CONNECTIONS: usize = 1000; +const MAX_MSG_PER_PUB: usize = 5; #[tokio::main(flavor = "current_thread")] async fn main() { @@ -22,19 +30,17 @@ async fn main() { let config = config.try_deserialize().unwrap(); let broker = Broker::new(config); - const CONNECTIONS: usize = 10; - const MAX_MSG_PER_PUB: usize = 5; - let (mut link_tx, mut link_rx) = broker .link("the_subscriber") .expect("New link should be made"); + link_tx .subscribe("hello/+/world") .expect("link should subscribe"); link_rx.recv().expect("Should recieve Ack"); - for i in 0..1 { + for i in 0..CONNECTIONS { let client_id = format!("client_{i}"); let topic = format!("hello/{}/world", client_id); let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB @@ -43,11 +49,8 @@ async fn main() { let topic: Bytes = topic.into(); let payload: Bytes = payload.into(); task::spawn(async move { - let mut interval = time::interval(Duration::from_secs(1)); for _ in 0..MAX_MSG_PER_PUB { - dbg!(); - interval.tick().await; - dbg!(); + time::sleep(Duration::from_secs(1)).await; let publish = Publish { dup: false, @@ -58,31 +61,29 @@ async fn main() { payload: payload.clone(), }; - dbg!("publish"); link_tx.send(Packet::Publish(publish, None)).await.unwrap(); } }); } - let count = Arc::new(AtomicU32::new(0)); - let instant = Instant::now(); - - tokio::spawn(keep_recv(link_rx, count.clone())); - - let eta = MAX_MSG_PER_PUB + 2; // 2 sec as buffer time / delay - - let mut interval = time::interval(Duration::from_secs(1)); - for _ in 0..eta { - interval.tick().await; - println!("TOTAL COUNT: {count:?}; TIME: {:?}", instant.elapsed()); - } + consumer(link_rx).await } -async fn keep_recv(mut link_rx: LinkRx, count: Arc) { +async fn consumer(mut link_rx: LinkRx) { + let mut count = 0; + let mut interval = time::interval(Duration::from_secs(1)); + let instant = Instant::now(); loop { - let notification = link_rx.recv().unwrap(); - if notification.is_some() { - count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + select! { + _ = interval.tick() => { + println!("TOTAL COUNT: {count:?}; TIME: {:?}", instant.elapsed()); + } + notification = link_rx.next() => { + let notification = notification.unwrap(); + if notification.is_some() { + count += 1; + } + } } } } From 26f82cc9d8c8385925bd935d4de0e095719004e2 Mon Sep 17 00:00:00 2001 From: tekjar Date: Sat, 26 Nov 2022 14:52:06 +0530 Subject: [PATCH 3/6] Use env to filter traces --- rumqttd/examples/stress.rs | 24 +++++++++++++++++------- rumqttd/src/main.rs | 1 + 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs index 533d21918..5c1e2cf94 100644 --- a/rumqttd/examples/stress.rs +++ b/rumqttd/examples/stress.rs @@ -11,19 +11,29 @@ use tokio::{ select, task, time::{self, Instant}, }; +use tracing_subscriber::EnvFilter; -const CONNECTIONS: usize = 1000; +const CONNECTIONS: usize = 200; const MAX_MSG_PER_PUB: usize = 5; #[tokio::main(flavor = "current_thread")] async fn main() { - // let router = Router::new(); // Router is not publically exposed! - tracing_subscriber::fmt::init(); + // RUST_LOG=rumqttd[{client_id=consumer}]=debug cargo run --example stress + let builder = tracing_subscriber::fmt() + .pretty() + .with_line_number(false) + .with_file(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_env_filter(EnvFilter::from_env("RUST_LOG")); + // .with_env_filter("rumqttd=debug"); + + builder.try_init().unwrap(); + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let config = format!("{manifest_dir}/demo.toml"); let config = config::Config::builder() - .add_source(config::File::with_name(&format!( - "{manifest_dir}/demo.toml" - ))) + .add_source(config::File::with_name(&config)) .build() .unwrap(); // Config::default() doesn't have working values @@ -31,7 +41,7 @@ async fn main() { let broker = Broker::new(config); let (mut link_tx, mut link_rx) = broker - .link("the_subscriber") + .link("consumer") .expect("New link should be made"); link_tx diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index cfc8b540b..6ad89c2b8 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -38,6 +38,7 @@ fn main() { _ => "rumqttd=trace", }; + // tracing syntac -> let builder = tracing_subscriber::fmt() .pretty() .with_line_number(false) From 2443cdadae75cb0f75821d3575d23676497be417 Mon Sep 17 00:00:00 2001 From: tekjar Date: Sat, 26 Nov 2022 17:19:27 +0530 Subject: [PATCH 4/6] Some comments to explain consumption in local link --- rumqttd/examples/stress.rs | 17 +++++------------ rumqttd/src/link/local.rs | 4 ++++ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs index 5c1e2cf94..e3bebd079 100644 --- a/rumqttd/examples/stress.rs +++ b/rumqttd/examples/stress.rs @@ -19,16 +19,11 @@ const MAX_MSG_PER_PUB: usize = 5; #[tokio::main(flavor = "current_thread")] async fn main() { // RUST_LOG=rumqttd[{client_id=consumer}]=debug cargo run --example stress - let builder = tracing_subscriber::fmt() - .pretty() - .with_line_number(false) - .with_file(false) - .with_thread_ids(false) - .with_thread_names(false) - .with_env_filter(EnvFilter::from_env("RUST_LOG")); + tracing_subscriber::fmt() // .with_env_filter("rumqttd=debug"); - - builder.try_init().unwrap(); + .with_env_filter(EnvFilter::from_env("RUST_LOG")) + .pretty() + .init(); let manifest_dir = env!("CARGO_MANIFEST_DIR"); let config = format!("{manifest_dir}/demo.toml"); @@ -40,9 +35,7 @@ async fn main() { let config = config.try_deserialize().unwrap(); let broker = Broker::new(config); - let (mut link_tx, mut link_rx) = broker - .link("consumer") - .expect("New link should be made"); + let (mut link_tx, mut link_rx) = broker.link("consumer").expect("New link should be made"); link_tx .subscribe("hello/+/world") diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 08e61ab86..ab6d23c4a 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -324,6 +324,8 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv()?; + // Collect 'all' the data in the buffer after a notification. + // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) } @@ -354,6 +356,8 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv_async().await?; + // Collect 'all' the data in the buffer after a notification. + // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) } From 71ae82860c3729bff6090c9ef1f7f1deb860faaa Mon Sep 17 00:00:00 2001 From: tekjar Date: Mon, 28 Nov 2022 22:30:39 +0530 Subject: [PATCH 5/6] Multiple consumers and minor clean up --- rumqttd/Cargo.toml | 1 + rumqttd/demo.toml | 2 +- rumqttd/examples/stress.rs | 72 ++++++++++++++++++++----------------- rumqttd/src/link/local.rs | 5 +-- rumqttd/src/protocol/mod.rs | 24 ++++++++++--- 5 files changed, 64 insertions(+), 40 deletions(-) diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 65e36b15b..41027f5ff 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -40,6 +40,7 @@ websockets = ["tokio-tungstenite", "websocket-codec", "tokio-util", "futures-uti validate-tenant-prefix = [] [dev-dependencies] +tokio = { version = "1.4.0", features = ["rt", "time", "net", "io-util", "macros", "sync"]} pretty_env_logger = "0.4.0" config = "0.13" pretty_assertions = "1.3.0" diff --git a/rumqttd/demo.toml b/rumqttd/demo.toml index c99c56bda..63ac1a25e 100644 --- a/rumqttd/demo.toml +++ b/rumqttd/demo.toml @@ -9,7 +9,7 @@ instant_ack = true max_segment_size = 104857600 max_segment_count = 10 max_read_len = 10240 -max_connections = 10001 +max_connections = 10010 # Configuration of server and connections that it accepts [v4.1] diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs index e3bebd079..c500146c9 100644 --- a/rumqttd/examples/stress.rs +++ b/rumqttd/examples/stress.rs @@ -1,20 +1,24 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use bytes::Bytes; use rumqttd::{ local::LinkRx, - protocol::{Packet, Publish, QoS}, - Broker, + protocol::{Packet, Publish}, + Broker, Notification, }; use tokio::{ - select, task, + select, + sync::Barrier, + task, time::{self, Instant}, }; use tracing_subscriber::EnvFilter; -const CONNECTIONS: usize = 200; -const MAX_MSG_PER_PUB: usize = 5; +const PUBLISHERS: usize = 10000; +const MAX_MSG_PER_PUB: usize = 100; +const SLEEP_TIME_MS_BETWEEN_PUB: u64 = 100; +const CONSUMERS: usize = 2; #[tokio::main(flavor = "current_thread")] async fn main() { @@ -35,56 +39,60 @@ async fn main() { let config = config.try_deserialize().unwrap(); let broker = Broker::new(config); - let (mut link_tx, mut link_rx) = broker.link("consumer").expect("New link should be made"); + for i in 0..CONSUMERS { + let client_id = format!("consumer_{i}"); + let (mut link_tx, mut link_rx) = broker.link(&client_id).unwrap(); - link_tx - .subscribe("hello/+/world") - .expect("link should subscribe"); + link_tx.subscribe("hello/+/world").unwrap(); + link_rx.recv().unwrap(); - link_rx.recv().expect("Should recieve Ack"); + task::spawn(async move { consumer(&client_id, link_rx).await }); + } - for i in 0..CONNECTIONS { - let client_id = format!("client_{i}"); + let barrier = Arc::new(Barrier::new(PUBLISHERS + 1)); + for i in 0..PUBLISHERS { + let c = barrier.clone(); + let client_id = format!("publisher_{i}"); let topic = format!("hello/{}/world", client_id); let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB - let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made"); + let (mut link_tx, _link_rx) = broker.link(&client_id).unwrap(); let topic: Bytes = topic.into(); let payload: Bytes = payload.into(); task::spawn(async move { for _ in 0..MAX_MSG_PER_PUB { - time::sleep(Duration::from_secs(1)).await; - - let publish = Publish { - dup: false, - qos: QoS::AtMostOnce, - retain: false, - topic: topic.clone(), - pkid: 0, - payload: payload.clone(), - }; - + time::sleep(Duration::from_millis(SLEEP_TIME_MS_BETWEEN_PUB)).await; + let publish = Publish::new(topic.clone(), payload.clone(), false); link_tx.send(Packet::Publish(publish, None)).await.unwrap(); } + + c.wait().await; }); } - consumer(link_rx).await + barrier.wait().await; + time::sleep(Duration::from_secs(5)).await; } -async fn consumer(mut link_rx: LinkRx) { +async fn consumer(client_id: &str, mut link_rx: LinkRx) { let mut count = 0; - let mut interval = time::interval(Duration::from_secs(1)); + let mut interval = time::interval(Duration::from_millis(500)); let instant = Instant::now(); loop { select! { _ = interval.tick() => { - println!("TOTAL COUNT: {count:?}; TIME: {:?}", instant.elapsed()); + println!("{client_id:?}: total count: {count:<10}; time: {:?}", instant.elapsed()); } notification = link_rx.next() => { - let notification = notification.unwrap(); - if notification.is_some() { - count += 1; + let notification = match notification.unwrap() { + Some(v) => v, + None => continue + }; + + match notification { + Notification::Forward(_) => count += 1, + Notification::Unschedule => link_rx.wake().await.unwrap(), + _ => unreachable!() } } } diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index ab6d23c4a..6051350f0 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -324,7 +324,7 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv()?; - // Collect 'all' the data in the buffer after a notification. + // Collect 'all' the data in the buffer after a notification. // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) @@ -356,7 +356,7 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv_async().await?; - // Collect 'all' the data in the buffer after a notification. + // Collect 'all' the data in the buffer after a notification. // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) @@ -382,6 +382,7 @@ impl LinkRx { self.router_tx .send_async((self.connection_id, Event::Ready)) .await?; + Ok(()) } } diff --git a/rumqttd/src/protocol/mod.rs b/rumqttd/src/protocol/mod.rs index 60d85c9d0..75f4ac3a9 100644 --- a/rumqttd/src/protocol/mod.rs +++ b/rumqttd/src/protocol/mod.rs @@ -170,17 +170,30 @@ pub struct ConnAckProperties { //--------------------------- Publish packet ------------------------------- /// Publish packet -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Publish { - pub dup: bool, - pub qos: QoS, + pub(crate) dup: bool, + pub(crate) qos: QoS, + pub(crate) pkid: u16, pub retain: bool, pub topic: Bytes, - pub pkid: u16, pub payload: Bytes, } impl Publish { + // Constructor for publish. Used in local links as local links shouldn't + // send qos 1 or 2 packets + pub fn new>(topic: T, payload: T, retain: bool) -> Publish { + Publish { + dup: false, + qos: QoS::AtMostOnce, + pkid: 0, + retain, + topic: topic.into(), + payload: payload.into(), + } + } + /// Approximate length for meter pub fn len(&self) -> usize { let len = 2 + self.topic.len() + self.payload.len(); @@ -558,9 +571,10 @@ pub struct DisconnectProperties { /// Quality of service #[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd)] #[allow(clippy::enum_variant_names)] pub enum QoS { + #[default] AtMostOnce = 0, AtLeastOnce = 1, ExactlyOnce = 2, From 5057c1426ac732ac3c3900c6b9882f544c8611df Mon Sep 17 00:00:00 2001 From: tekjar Date: Sat, 3 Dec 2022 14:48:52 +0530 Subject: [PATCH 6/6] Fix some clippy warnings --- rumqttd/src/lib.rs | 2 +- rumqttd/src/main.rs | 2 +- rumqttd/src/protocol/mod.rs | 8 ++++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 4fe9a0c27..1f217c204 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -16,8 +16,8 @@ use tracing_subscriber::{ use std::net::SocketAddr; -pub mod protocol; mod link; +pub mod protocol; mod router; mod segments; mod server; diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index 6ad89c2b8..b60f168e8 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -38,7 +38,7 @@ fn main() { _ => "rumqttd=trace", }; - // tracing syntac -> + // tracing syntac -> let builder = tracing_subscriber::fmt() .pretty() .with_line_number(false) diff --git a/rumqttd/src/protocol/mod.rs b/rumqttd/src/protocol/mod.rs index 75f4ac3a9..ba295d482 100644 --- a/rumqttd/src/protocol/mod.rs +++ b/rumqttd/src/protocol/mod.rs @@ -194,6 +194,10 @@ impl Publish { } } + pub fn is_empty(&self) -> bool { + false + } + /// Approximate length for meter pub fn len(&self) -> usize { let len = 2 + self.topic.len() + self.payload.len(); @@ -325,6 +329,10 @@ pub struct SubAck { } impl SubAck { + pub fn is_empty(&self) -> bool { + false + } + pub fn len(&self) -> usize { 2 + self.return_codes.len() }