diff --git a/rumqttd/Cargo.toml b/rumqttd/Cargo.toml index 65e36b15b..41027f5ff 100644 --- a/rumqttd/Cargo.toml +++ b/rumqttd/Cargo.toml @@ -40,6 +40,7 @@ websockets = ["tokio-tungstenite", "websocket-codec", "tokio-util", "futures-uti validate-tenant-prefix = [] [dev-dependencies] +tokio = { version = "1.4.0", features = ["rt", "time", "net", "io-util", "macros", "sync"]} pretty_env_logger = "0.4.0" config = "0.13" pretty_assertions = "1.3.0" diff --git a/rumqttd/demo.toml b/rumqttd/demo.toml index c99c56bda..63ac1a25e 100644 --- a/rumqttd/demo.toml +++ b/rumqttd/demo.toml @@ -9,7 +9,7 @@ instant_ack = true max_segment_size = 104857600 max_segment_count = 10 max_read_len = 10240 -max_connections = 10001 +max_connections = 10010 # Configuration of server and connections that it accepts [v4.1] diff --git a/rumqttd/examples/stress.rs b/rumqttd/examples/stress.rs new file mode 100644 index 000000000..c500146c9 --- /dev/null +++ b/rumqttd/examples/stress.rs @@ -0,0 +1,100 @@ +use std::{sync::Arc, time::Duration}; + +use bytes::Bytes; +use rumqttd::{ + local::LinkRx, + protocol::{Packet, Publish}, + Broker, Notification, +}; + +use tokio::{ + select, + sync::Barrier, + task, + time::{self, Instant}, +}; +use tracing_subscriber::EnvFilter; + +const PUBLISHERS: usize = 10000; +const MAX_MSG_PER_PUB: usize = 100; +const SLEEP_TIME_MS_BETWEEN_PUB: u64 = 100; +const CONSUMERS: usize = 2; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + // RUST_LOG=rumqttd[{client_id=consumer}]=debug cargo run --example stress + tracing_subscriber::fmt() + // .with_env_filter("rumqttd=debug"); + .with_env_filter(EnvFilter::from_env("RUST_LOG")) + .pretty() + .init(); + + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + let config = format!("{manifest_dir}/demo.toml"); + let config = config::Config::builder() + .add_source(config::File::with_name(&config)) + .build() + .unwrap(); // Config::default() doesn't have working values + + let config = config.try_deserialize().unwrap(); + let broker = Broker::new(config); + + for i in 0..CONSUMERS { + let client_id = format!("consumer_{i}"); + let (mut link_tx, mut link_rx) = broker.link(&client_id).unwrap(); + + link_tx.subscribe("hello/+/world").unwrap(); + link_rx.recv().unwrap(); + + task::spawn(async move { consumer(&client_id, link_rx).await }); + } + + let barrier = Arc::new(Barrier::new(PUBLISHERS + 1)); + for i in 0..PUBLISHERS { + let c = barrier.clone(); + let client_id = format!("publisher_{i}"); + let topic = format!("hello/{}/world", client_id); + let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB + let (mut link_tx, _link_rx) = broker.link(&client_id).unwrap(); + + let topic: Bytes = topic.into(); + let payload: Bytes = payload.into(); + task::spawn(async move { + for _ in 0..MAX_MSG_PER_PUB { + time::sleep(Duration::from_millis(SLEEP_TIME_MS_BETWEEN_PUB)).await; + let publish = Publish::new(topic.clone(), payload.clone(), false); + link_tx.send(Packet::Publish(publish, None)).await.unwrap(); + } + + c.wait().await; + }); + } + + barrier.wait().await; + time::sleep(Duration::from_secs(5)).await; +} + +async fn consumer(client_id: &str, mut link_rx: LinkRx) { + let mut count = 0; + let mut interval = time::interval(Duration::from_millis(500)); + let instant = Instant::now(); + loop { + select! { + _ = interval.tick() => { + println!("{client_id:?}: total count: {count:<10}; time: {:?}", instant.elapsed()); + } + notification = link_rx.next() => { + let notification = match notification.unwrap() { + Some(v) => v, + None => continue + }; + + match notification { + Notification::Forward(_) => count += 1, + Notification::Unschedule => link_rx.wake().await.unwrap(), + _ => unreachable!() + } + } + } + } +} diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 31412db62..1f217c204 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -17,7 +17,7 @@ use tracing_subscriber::{ use std::net::SocketAddr; mod link; -mod protocol; +pub mod protocol; mod router; mod segments; mod server; diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 15c8980cd..6051350f0 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -170,6 +170,20 @@ impl LinkTx { Ok(len) } + /// Send raw device data + pub async fn send(&mut self, data: Packet) -> Result { + let len = { + let mut buffer = self.recv_buffer.lock(); + buffer.push_back(data); + buffer.len() + }; + + self.router_tx + .send((self.connection_id, Event::DeviceData))?; + + Ok(len) + } + fn try_push(&mut self, data: Packet) -> Result { let len = { let mut buffer = self.recv_buffer.lock(); @@ -310,6 +324,8 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv()?; + // Collect 'all' the data in the buffer after a notification. + // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) } @@ -340,6 +356,8 @@ impl LinkRx { None => { // If cache is empty, check for router trigger and get fresh notifications self.router_rx.recv_async().await?; + // Collect 'all' the data in the buffer after a notification. + // Notification means fresh data which isn't previously collected mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); Ok(self.cache.pop_front()) } @@ -364,6 +382,7 @@ impl LinkRx { self.router_tx .send_async((self.connection_id, Event::Ready)) .await?; + Ok(()) } } diff --git a/rumqttd/src/main.rs b/rumqttd/src/main.rs index cfc8b540b..b60f168e8 100644 --- a/rumqttd/src/main.rs +++ b/rumqttd/src/main.rs @@ -38,6 +38,7 @@ fn main() { _ => "rumqttd=trace", }; + // tracing syntac -> let builder = tracing_subscriber::fmt() .pretty() .with_line_number(false) diff --git a/rumqttd/src/protocol/mod.rs b/rumqttd/src/protocol/mod.rs index 60d85c9d0..ba295d482 100644 --- a/rumqttd/src/protocol/mod.rs +++ b/rumqttd/src/protocol/mod.rs @@ -170,17 +170,34 @@ pub struct ConnAckProperties { //--------------------------- Publish packet ------------------------------- /// Publish packet -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Publish { - pub dup: bool, - pub qos: QoS, + pub(crate) dup: bool, + pub(crate) qos: QoS, + pub(crate) pkid: u16, pub retain: bool, pub topic: Bytes, - pub pkid: u16, pub payload: Bytes, } impl Publish { + // Constructor for publish. Used in local links as local links shouldn't + // send qos 1 or 2 packets + pub fn new>(topic: T, payload: T, retain: bool) -> Publish { + Publish { + dup: false, + qos: QoS::AtMostOnce, + pkid: 0, + retain, + topic: topic.into(), + payload: payload.into(), + } + } + + pub fn is_empty(&self) -> bool { + false + } + /// Approximate length for meter pub fn len(&self) -> usize { let len = 2 + self.topic.len() + self.payload.len(); @@ -312,6 +329,10 @@ pub struct SubAck { } impl SubAck { + pub fn is_empty(&self) -> bool { + false + } + pub fn len(&self) -> usize { 2 + self.return_codes.len() } @@ -558,9 +579,10 @@ pub struct DisconnectProperties { /// Quality of service #[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd)] #[allow(clippy::enum_variant_names)] pub enum QoS { + #[default] AtMostOnce = 0, AtLeastOnce = 1, ExactlyOnce = 2,