Skip to content

Commit

Permalink
fix: ordering of events returned on poll() (#862)
Browse files Browse the repository at this point in the history
* fix: ordering of events as mentioned in #860

* fix: clippy suggestion

https://github.com/bytebeamio/rumqtt/actions/runs/9013022148/job/24763146160?pr=862#step:7:216

* chore: clippy suggestions https://github.com/bytebeamio/rumqtt/actions/runs/9028175682/job/24808221311?pr=862

* chore: update CHANGELOG
  • Loading branch information
de-sh committed May 10, 2024
1 parent c5bfdd7 commit 7f594f8
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 22 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes

### Security

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/Cargo.toml
Expand Up @@ -23,7 +23,7 @@ websocket = ["dep:async-tungstenite", "dep:ws_stream_tungstenite", "dep:http"]
proxy = ["dep:async-http-proxy"]

[dependencies]
futures-util = { version = "0.3", default_features = false, features = ["std", "sink"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time"] }
tokio-util = { version = "0.7", features = ["codec"] }
bytes = "1.5"
Expand Down
5 changes: 3 additions & 2 deletions rumqttc/src/state.rs
Expand Up @@ -90,7 +90,7 @@ impl MqttState {
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; std::u16::MAX as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand Down Expand Up @@ -165,6 +165,8 @@ impl MqttState {
&mut self,
packet: Incoming,
) -> Result<Option<Packet>, StateError> {
self.events.push_back(Event::Incoming(packet.clone()));

let outgoing = match &packet {
Incoming::PingResp => self.handle_incoming_pingresp()?,
Incoming::Publish(publish) => self.handle_incoming_publish(publish)?,
Expand All @@ -179,7 +181,6 @@ impl MqttState {
return Err(StateError::WrongPacket);
}
};
self.events.push_back(Event::Incoming(packet));
self.last_incoming = Instant::now();

Ok(outgoing)
Expand Down
9 changes: 5 additions & 4 deletions rumqttc/src/v5/state.rs
Expand Up @@ -65,7 +65,7 @@ pub enum StateError {
#[error("Connection failed with reason '{reason:?}' ")]
ConnFail { reason: ConnectReturnCode },
#[error("Connection closed by peer abruptly")]
ConnectionAborted
ConnectionAborted,
}

impl From<mqttbytes::Error> for StateError {
Expand Down Expand Up @@ -138,7 +138,7 @@ impl MqttState {
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; std::u16::MAX as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand Down Expand Up @@ -217,6 +217,8 @@ impl MqttState {
&mut self,
mut packet: Incoming,
) -> Result<Option<Packet>, StateError> {
self.events.push_back(Event::Incoming(packet.to_owned()));

let outgoing = match &mut packet {
Incoming::PingResp(_) => self.handle_incoming_pingresp()?,
Incoming::Publish(publish) => self.handle_incoming_publish(publish)?,
Expand All @@ -234,7 +236,6 @@ impl MqttState {
}
};

self.events.push_back(Event::Incoming(packet));
self.last_incoming = Instant::now();
Ok(outgoing)
}
Expand Down Expand Up @@ -331,7 +332,7 @@ impl MqttState {
}
} else if let Some(alias) = topic_alias {
if let Some(topic) = self.topic_alises.get(&alias) {
publish.topic = topic.to_owned();
topic.clone_into(&mut publish.topic);
} else {
self.handle_protocol_error()?;
};
Expand Down
6 changes: 3 additions & 3 deletions rumqttd/src/router/alertlog.rs
Expand Up @@ -71,14 +71,14 @@ pub mod alert {
pub use alert::*;

pub struct AlertLog {
pub config: RouterConfig,
pub _config: RouterConfig,
pub alerts: VecDeque<Alert>,
}

impl AlertLog {
pub fn new(config: RouterConfig) -> AlertLog {
pub fn new(_config: RouterConfig) -> AlertLog {
AlertLog {
config,
_config,
alerts: VecDeque::with_capacity(100),
}
}
Expand Down
14 changes: 7 additions & 7 deletions rumqttd/src/router/mod.rs
Expand Up @@ -212,7 +212,7 @@ pub struct Message {
/// Log to sweep
pub topic: String,
/// Qos of the topic
pub qos: u8,
pub _qos: u8,
/// Reply data chain
pub payload: Bytes,
}
Expand Down Expand Up @@ -250,12 +250,12 @@ impl fmt::Debug for Data {
}
}

#[derive(Debug, Clone)]
pub struct Disconnection {
pub id: String,
pub execute_will: bool,
pub pending: Vec<Notification>,
}
// #[derive(Debug, Clone)]
// pub struct Disconnection {
// pub id: String,
// pub execute_will: bool,
// pub pending: Vec<Notification>,
// }

#[derive(Debug, Clone)]
pub struct ShadowRequest {
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/routing.rs
Expand Up @@ -322,7 +322,7 @@ impl Router {
|session_state| {
connection.subscriptions = session_state.subscriptions;
// for using in acklog
pending_acks = session_state.unacked_pubrels.clone();
pending_acks.clone_from(&session_state.unacked_pubrels);
outgoing.unacked_pubrels = session_state.unacked_pubrels;
session_state.tracker
},
Expand Down
1 change: 0 additions & 1 deletion rumqttd/src/segments/mod.rs
@@ -1,5 +1,4 @@
use crate::Offset;
use std::usize;
use std::{collections::VecDeque, io};

mod segment;
Expand Down
6 changes: 3 additions & 3 deletions rumqttd/src/server/mod.rs
@@ -1,10 +1,10 @@
use tokio::io::{AsyncRead, AsyncWrite};
// use tokio::io::{AsyncRead, AsyncWrite};

mod broker;
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
mod tls;

pub use broker::Broker;

pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> IO for T {}
// pub trait IO: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
// impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> IO for T {}

0 comments on commit 7f594f8

Please sign in to comment.