Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stress test the router #511

Merged
merged 6 commits into from Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions rumqttd/Cargo.toml
Expand Up @@ -40,6 +40,7 @@ websockets = ["tokio-tungstenite", "websocket-codec", "tokio-util", "futures-uti
validate-tenant-prefix = []

[dev-dependencies]
tokio = { version = "1.4.0", features = ["rt", "time", "net", "io-util", "macros", "sync"]}
pretty_env_logger = "0.4.0"
config = "0.13"
pretty_assertions = "1.3.0"
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/demo.toml
Expand Up @@ -9,7 +9,7 @@ instant_ack = true
max_segment_size = 104857600
max_segment_count = 10
max_read_len = 10240
max_connections = 10001
max_connections = 10010

# Configuration of server and connections that it accepts
[v4.1]
Expand Down
100 changes: 100 additions & 0 deletions rumqttd/examples/stress.rs
@@ -0,0 +1,100 @@
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use rumqttd::{
local::LinkRx,
protocol::{Packet, Publish},
Broker, Notification,
};

use tokio::{
select,
sync::Barrier,
task,
time::{self, Instant},
};
use tracing_subscriber::EnvFilter;

const PUBLISHERS: usize = 10000;
const MAX_MSG_PER_PUB: usize = 100;
const SLEEP_TIME_MS_BETWEEN_PUB: u64 = 100;
const CONSUMERS: usize = 2;

#[tokio::main(flavor = "current_thread")]
async fn main() {
// RUST_LOG=rumqttd[{client_id=consumer}]=debug cargo run --example stress
tracing_subscriber::fmt()
// .with_env_filter("rumqttd=debug");
.with_env_filter(EnvFilter::from_env("RUST_LOG"))
.pretty()
.init();

let manifest_dir = env!("CARGO_MANIFEST_DIR");
let config = format!("{manifest_dir}/demo.toml");
let config = config::Config::builder()
.add_source(config::File::with_name(&config))
.build()
.unwrap(); // Config::default() doesn't have working values

let config = config.try_deserialize().unwrap();
let broker = Broker::new(config);

for i in 0..CONSUMERS {
let client_id = format!("consumer_{i}");
let (mut link_tx, mut link_rx) = broker.link(&client_id).unwrap();

link_tx.subscribe("hello/+/world").unwrap();
link_rx.recv().unwrap();

task::spawn(async move { consumer(&client_id, link_rx).await });
}

let barrier = Arc::new(Barrier::new(PUBLISHERS + 1));
for i in 0..PUBLISHERS {
let c = barrier.clone();
let client_id = format!("publisher_{i}");
let topic = format!("hello/{}/world", client_id);
let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB
let (mut link_tx, _link_rx) = broker.link(&client_id).unwrap();

let topic: Bytes = topic.into();
let payload: Bytes = payload.into();
task::spawn(async move {
for _ in 0..MAX_MSG_PER_PUB {
time::sleep(Duration::from_millis(SLEEP_TIME_MS_BETWEEN_PUB)).await;
let publish = Publish::new(topic.clone(), payload.clone(), false);
link_tx.send(Packet::Publish(publish, None)).await.unwrap();
}

c.wait().await;
});
}

barrier.wait().await;
time::sleep(Duration::from_secs(5)).await;
}

async fn consumer(client_id: &str, mut link_rx: LinkRx) {
let mut count = 0;
let mut interval = time::interval(Duration::from_millis(500));
let instant = Instant::now();
loop {
select! {
_ = interval.tick() => {
println!("{client_id:?}: total count: {count:<10}; time: {:?}", instant.elapsed());
}
notification = link_rx.next() => {
let notification = match notification.unwrap() {
Some(v) => v,
None => continue
};

match notification {
Notification::Forward(_) => count += 1,
Notification::Unschedule => link_rx.wake().await.unwrap(),
_ => unreachable!()
}
}
}
}
}
2 changes: 1 addition & 1 deletion rumqttd/src/lib.rs
Expand Up @@ -17,7 +17,7 @@ use tracing_subscriber::{
use std::net::SocketAddr;

mod link;
mod protocol;
pub mod protocol;
mod router;
mod segments;
mod server;
Expand Down
19 changes: 19 additions & 0 deletions rumqttd/src/link/local.rs
Expand Up @@ -170,6 +170,20 @@ impl LinkTx {
Ok(len)
}

/// Send raw device data
pub async fn send(&mut self, data: Packet) -> Result<usize, LinkError> {
let len = {
let mut buffer = self.recv_buffer.lock();
buffer.push_back(data);
buffer.len()
};

self.router_tx
.send((self.connection_id, Event::DeviceData))?;

Ok(len)
}

fn try_push(&mut self, data: Packet) -> Result<usize, LinkError> {
let len = {
let mut buffer = self.recv_buffer.lock();
Expand Down Expand Up @@ -310,6 +324,8 @@ impl LinkRx {
None => {
// If cache is empty, check for router trigger and get fresh notifications
self.router_rx.recv()?;
// Collect 'all' the data in the buffer after a notification.
// Notification means fresh data which isn't previously collected
mem::swap(&mut *self.send_buffer.lock(), &mut self.cache);
Ok(self.cache.pop_front())
}
Expand Down Expand Up @@ -340,6 +356,8 @@ impl LinkRx {
None => {
// If cache is empty, check for router trigger and get fresh notifications
self.router_rx.recv_async().await?;
// Collect 'all' the data in the buffer after a notification.
// Notification means fresh data which isn't previously collected
mem::swap(&mut *self.send_buffer.lock(), &mut self.cache);
Ok(self.cache.pop_front())
}
Expand All @@ -364,6 +382,7 @@ impl LinkRx {
self.router_tx
.send_async((self.connection_id, Event::Ready))
.await?;

Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions rumqttd/src/main.rs
Expand Up @@ -38,6 +38,7 @@ fn main() {
_ => "rumqttd=trace",
};

// tracing syntac ->
let builder = tracing_subscriber::fmt()
.pretty()
.with_line_number(false)
Expand Down
32 changes: 27 additions & 5 deletions rumqttd/src/protocol/mod.rs
Expand Up @@ -170,17 +170,34 @@ pub struct ConnAckProperties {
//--------------------------- Publish packet -------------------------------

/// Publish packet
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Publish {
pub dup: bool,
pub qos: QoS,
pub(crate) dup: bool,
pub(crate) qos: QoS,
pub(crate) pkid: u16,
pub retain: bool,
pub topic: Bytes,
pub pkid: u16,
pub payload: Bytes,
}

impl Publish {
// Constructor for publish. Used in local links as local links shouldn't
// send qos 1 or 2 packets
pub fn new<T: Into<Bytes>>(topic: T, payload: T, retain: bool) -> Publish {
Publish {
dup: false,
qos: QoS::AtMostOnce,
pkid: 0,
retain,
topic: topic.into(),
payload: payload.into(),
}
}

pub fn is_empty(&self) -> bool {
false
}

/// Approximate length for meter
pub fn len(&self) -> usize {
let len = 2 + self.topic.len() + self.payload.len();
Expand Down Expand Up @@ -312,6 +329,10 @@ pub struct SubAck {
}

impl SubAck {
pub fn is_empty(&self) -> bool {
false
}

pub fn len(&self) -> usize {
2 + self.return_codes.len()
}
Expand Down Expand Up @@ -558,9 +579,10 @@ pub struct DisconnectProperties {

/// Quality of service
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd)]
#[allow(clippy::enum_variant_names)]
pub enum QoS {
#[default]
AtMostOnce = 0,
AtLeastOnce = 1,
ExactlyOnce = 2,
Expand Down