diff --git a/CHANGELOG.md b/CHANGELOG.md index dd6eb7386..645217276 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ### Unreleased - Removed unused dependencies and updated version of some of used libraries to fix dependabots warning (#475) +- (rumqttd) Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480) ### R17 --- diff --git a/rumqttd/src/protocol/mod.rs b/rumqttd/src/protocol/mod.rs index 80c497143..8fc85a0bd 100644 --- a/rumqttd/src/protocol/mod.rs +++ b/rumqttd/src/protocol/mod.rs @@ -27,7 +27,7 @@ pub enum Packet { Option, Option, ), - ConnAck(ConnAck), + ConnAck(ConnAck, Option), Publish(Publish, Option), PubAck(PubAck, Option), PingReq(PingReq), @@ -37,9 +37,9 @@ pub enum Packet { PubRec(PubRec, Option), PubRel(PubRel, Option), PubComp(PubComp, Option), - Unsubscribe(Unsubscribe), - UnsubAck(UnsubAck), - Disconnect, + Unsubscribe(Unsubscribe, Option), + UnsubAck(UnsubAck, Option), + Disconnect(Disconnect, Option), } //--------------------------- Connect packet ------------------------------- @@ -381,69 +381,6 @@ pub struct UnsubAckProperties { pub user_properties: Vec<(String, String)>, } -//--------------------------- Disconnect packet ------------------------------- -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum DisconnectReasonCode { - /// Close the connection normally. Do not send the Will Message. - NormalDisconnection, - /// The Client wishes to disconnect but requires that the Server also publishes its Will Message. - DisconnectWithWillMessage, - /// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply. - UnspecifiedError, - /// The received packet does not conform to this specification. - MalformedPacket, - /// An unexpected or out of order packet was received. - ProtocolError, - /// The packet received is valid but cannot be processed by this implementation. - ImplementationSpecificError, - /// The request is not authorized. - NotAuthorized, - /// The Server is busy and cannot continue processing requests from this Client. - ServerBusy, - /// The Server is shutting down. - ServerShuttingDown, - /// The Connection is closed because no packet has been received for 1.5 times the Keepalive time. - KeepAliveTimeout, - /// Another Connection using the same ClientID has connected causing this Connection to be closed. - SessionTakenOver, - /// The Topic Filter is correctly formed, but is not accepted by this Sever. - TopicFilterInvalid, - /// The Topic Name is correctly formed, but is not accepted by this Client or Server. - TopicNameInvalid, - /// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP. - ReceiveMaximumExceeded, - /// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet. - TopicAliasInvalid, - /// The packet size is greater than Maximum Packet Size for this Client or Server. - PacketTooLarge, - /// The received data rate is too high. - MessageRateTooHigh, - /// An implementation or administrative imposed limit has been exceeded. - QuotaExceeded, - /// The Connection is closed due to an administrative action. - AdministrativeAction, - /// The payload format does not match the one specified by the Payload Format Indicator. - PayloadFormatInvalid, - /// The Server has does not support retained messages. - RetainNotSupported, - /// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK. - QoSNotSupported, - /// The Client should temporarily change its Server. - UseAnotherServer, - /// The Server is moved and the Client should permanently change its server location. - ServerMoved, - /// The Server does not support Shared Subscriptions. - SharedSubscriptionNotSupported, - /// This connection is closed because the connection rate is too high. - ConnectionRateExceeded, - /// The maximum connection time authorized for this connection has been exceeded. - MaximumConnectTime, - /// The Server does not support Subscription Identifiers; the subscription is not accepted. - SubscriptionIdentifiersNotSupported, - /// The Server does not support Wildcard subscription; the subscription is not accepted. - WildcardSubscriptionsNotSupported, -} - //--------------------------- Ping packet ------------------------------- struct Ping; @@ -533,6 +470,92 @@ pub struct PubRelProperties { //------------------------------------------------------------------------ +//--------------------------- Disconnect packet ------------------------------- +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Disconnect { + /// Disconnect Reason Code + pub reason_code: DisconnectReasonCode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum DisconnectReasonCode { + /// Close the connection normally. Do not send the Will Message. + NormalDisconnection, + /// The Client wishes to disconnect but requires that the Server also publishes its Will Message. + DisconnectWithWillMessage, + /// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply. + UnspecifiedError, + /// The received packet does not conform to this specification. + MalformedPacket, + /// An unexpected or out of order packet was received. + ProtocolError, + /// The packet received is valid but cannot be processed by this implementation. + ImplementationSpecificError, + /// The request is not authorized. + NotAuthorized, + /// The Server is busy and cannot continue processing requests from this Client. + ServerBusy, + /// The Server is shutting down. + ServerShuttingDown, + /// The Connection is closed because no packet has been received for 1.5 times the Keepalive time. + KeepAliveTimeout, + /// Another Connection using the same ClientID has connected causing this Connection to be closed. + SessionTakenOver, + /// The Topic Filter is correctly formed, but is not accepted by this Sever. + TopicFilterInvalid, + /// The Topic Name is correctly formed, but is not accepted by this Client or Server. + TopicNameInvalid, + /// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP. + ReceiveMaximumExceeded, + /// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet. + TopicAliasInvalid, + /// The packet size is greater than Maximum Packet Size for this Client or Server. + PacketTooLarge, + /// The received data rate is too high. + MessageRateTooHigh, + /// An implementation or administrative imposed limit has been exceeded. + QuotaExceeded, + /// The Connection is closed due to an administrative action. + AdministrativeAction, + /// The payload format does not match the one specified by the Payload Format Indicator. + PayloadFormatInvalid, + /// The Server has does not support retained messages. + RetainNotSupported, + /// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK. + QoSNotSupported, + /// The Client should temporarily change its Server. + UseAnotherServer, + /// The Server is moved and the Client should permanently change its server location. + ServerMoved, + /// The Server does not support Shared Subscriptions. + SharedSubscriptionNotSupported, + /// This connection is closed because the connection rate is too high. + ConnectionRateExceeded, + /// The maximum connection time authorized for this connection has been exceeded. + MaximumConnectTime, + /// The Server does not support Subscription Identifiers; the subscription is not accepted. + SubscriptionIdentifiersNotSupported, + /// The Server does not support Wildcard subscription; the subscription is not accepted. + WildcardSubscriptionsNotSupported, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DisconnectProperties { + /// Session Expiry Interval in seconds + pub session_expiry_interval: Option, + + /// Human readable reason for the disconnect + pub reason_string: Option, + + /// List of user properties + pub user_properties: Vec<(String, String)>, + + /// String which can be used by the Client to identify another Server to use. + pub server_reference: Option, +} +//------------------------------------------------------------------------ + /// Quality of service #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] diff --git a/rumqttd/src/protocol/v4/disconnect.rs b/rumqttd/src/protocol/v4/disconnect.rs index 695013c0e..c9800923e 100644 --- a/rumqttd/src/protocol/v4/disconnect.rs +++ b/rumqttd/src/protocol/v4/disconnect.rs @@ -1,11 +1,8 @@ use super::*; use bytes::{BufMut, BytesMut}; -pub struct Disconnect; - -impl Disconnect { - pub fn write(&self, payload: &mut BytesMut) -> Result { - payload.put_slice(&[0xE0, 0x00]); - Ok(2) - } +// In v4 there are no Reason Code and properties +pub fn write(_disconnect: &Disconnect, payload: &mut BytesMut) -> Result { + payload.put_slice(&[0xE0, 0x00]); + Ok(2) } diff --git a/rumqttd/src/protocol/v4/mod.rs b/rumqttd/src/protocol/v4/mod.rs index 2a8bdf2d7..13eb99821 100644 --- a/rumqttd/src/protocol/v4/mod.rs +++ b/rumqttd/src/protocol/v4/mod.rs @@ -299,7 +299,12 @@ impl Protocol for V4 { return match packet_type { PacketType::PingReq => Ok(Packet::PingReq(PingReq)), PacketType::PingResp => Ok(Packet::PingResp(PingResp)), - PacketType::Disconnect => Ok(Packet::Disconnect), + PacketType::Disconnect => Ok(Packet::Disconnect( + Disconnect { + reason_code: DisconnectReasonCode::NormalDisconnection, + }, + None, + )), _ => Err(Error::PayloadRequired), }; } @@ -310,7 +315,7 @@ impl Protocol for V4 { let (connect, login, lastwill) = connect::read(fixed_header, packet)?; Packet::Connect(connect, None, lastwill, None, login) } - PacketType::ConnAck => Packet::ConnAck(connack::read(fixed_header, packet)?), + PacketType::ConnAck => Packet::ConnAck(connack::read(fixed_header, packet)?, None), PacketType::Publish => Packet::Publish(publish::read(fixed_header, packet)?, None), PacketType::PubAck => Packet::PubAck(puback::read(fixed_header, packet)?, None), PacketType::Subscribe => { @@ -318,15 +323,17 @@ impl Protocol for V4 { } PacketType::SubAck => Packet::SubAck(suback::read(fixed_header, packet)?, None), PacketType::Unsubscribe => { - Packet::Unsubscribe(unsubscribe::read(fixed_header, packet)?) + Packet::Unsubscribe(unsubscribe::read(fixed_header, packet)?, None) } - PacketType::UnsubAck => Packet::UnsubAck(unsuback::read(fixed_header, packet)?), + PacketType::UnsubAck => Packet::UnsubAck(unsuback::read(fixed_header, packet)?, None), PacketType::PingReq => Packet::PingReq(PingReq), PacketType::PingResp => Packet::PingResp(PingResp), PacketType::PubRec => Packet::PubRec(pubrec::read(fixed_header, packet)?, None), PacketType::PubRel => Packet::PubRel(pubrel::read(fixed_header, packet)?, None), PacketType::PubComp => Packet::PubComp(pubcomp::read(fixed_header, packet)?, None), - PacketType::Disconnect => Packet::Disconnect, + // v4 Disconnect packet gets handled in the previous check, this branch gets hit when + // Disconnect packet has properties which is only valid for v5 + PacketType::Disconnect => return Err(Error::InvalidProtocol), _ => unreachable!(), }; diff --git a/rumqttd/src/protocol/v5/disconnect.rs b/rumqttd/src/protocol/v5/disconnect.rs index 002b2104c..656499206 100644 --- a/rumqttd/src/protocol/v5/disconnect.rs +++ b/rumqttd/src/protocol/v5/disconnect.rs @@ -6,167 +6,115 @@ use super::*; use super::{property, PropertyType}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u8)] -pub enum DisconnectReasonCode { - /// Close the connection normally. Do not send the Will Message. - NormalDisconnection = 0x00, - /// The Client wishes to disconnect but requires that the Server also publishes its Will Message. - DisconnectWithWillMessage = 0x04, - /// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply. - UnspecifiedError = 0x80, - /// The received packet does not conform to this specification. - MalformedPacket = 0x81, - /// An unexpected or out of order packet was received. - ProtocolError = 0x82, - /// The packet received is valid but cannot be processed by this implementation. - ImplementationSpecificError = 0x83, - /// The request is not authorized. - NotAuthorized = 0x87, - /// The Server is busy and cannot continue processing requests from this Client. - ServerBusy = 0x89, - /// The Server is shutting down. - ServerShuttingDown = 0x8B, - /// The Connection is closed because no packet has been received for 1.5 times the Keepalive time. - KeepAliveTimeout = 0x8D, - /// Another Connection using the same ClientID has connected causing this Connection to be closed. - SessionTakenOver = 0x8E, - /// The Topic Filter is correctly formed, but is not accepted by this Sever. - TopicFilterInvalid = 0x8F, - /// The Topic Name is correctly formed, but is not accepted by this Client or Server. - TopicNameInvalid = 0x90, - /// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP. - ReceiveMaximumExceeded = 0x93, - /// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet. - TopicAliasInvalid = 0x94, - /// The packet size is greater than Maximum Packet Size for this Client or Server. - PacketTooLarge = 0x95, - /// The received data rate is too high. - MessageRateTooHigh = 0x96, - /// An implementation or administrative imposed limit has been exceeded. - QuotaExceeded = 0x97, - /// The Connection is closed due to an administrative action. - AdministrativeAction = 0x98, - /// The payload format does not match the one specified by the Payload Format Indicator. - PayloadFormatInvalid = 0x99, - /// The Server has does not support retained messages. - RetainNotSupported = 0x9A, - /// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK. - QoSNotSupported = 0x9B, - /// The Client should temporarily change its Server. - UseAnotherServer = 0x9C, - /// The Server is moved and the Client should permanently change its server location. - ServerMoved = 0x9D, - /// The Server does not support Shared Subscriptions. - SharedSubscriptionNotSupported = 0x9E, - /// This connection is closed because the connection rate is too high. - ConnectionRateExceeded = 0x9F, - /// The maximum connection time authorized for this connection has been exceeded. - MaximumConnectTime = 0xA0, - /// The Server does not support Subscription Identifiers; the subscription is not accepted. - SubscriptionIdentifiersNotSupported = 0xA1, - /// The Server does not support Wildcard subscription; the subscription is not accepted. - WildcardSubscriptionsNotSupported = 0xA2, -} +fn len(disconnect: &Disconnect, properties: &Option) -> usize { + if disconnect.reason_code == DisconnectReasonCode::NormalDisconnection && properties.is_none() { + return 2; // Packet type + 0x00 + } -impl TryFrom for DisconnectReasonCode { - type Error = Error; - - fn try_from(value: u8) -> Result { - let rc = match value { - 0x00 => Self::NormalDisconnection, - 0x04 => Self::DisconnectWithWillMessage, - 0x80 => Self::UnspecifiedError, - 0x81 => Self::MalformedPacket, - 0x82 => Self::ProtocolError, - 0x83 => Self::ImplementationSpecificError, - 0x87 => Self::NotAuthorized, - 0x89 => Self::ServerBusy, - 0x8B => Self::ServerShuttingDown, - 0x8D => Self::KeepAliveTimeout, - 0x8E => Self::SessionTakenOver, - 0x8F => Self::TopicFilterInvalid, - 0x90 => Self::TopicNameInvalid, - 0x93 => Self::ReceiveMaximumExceeded, - 0x94 => Self::TopicAliasInvalid, - 0x95 => Self::PacketTooLarge, - 0x96 => Self::MessageRateTooHigh, - 0x97 => Self::QuotaExceeded, - 0x98 => Self::AdministrativeAction, - 0x99 => Self::PayloadFormatInvalid, - 0x9A => Self::RetainNotSupported, - 0x9B => Self::QoSNotSupported, - 0x9C => Self::UseAnotherServer, - 0x9D => Self::ServerMoved, - 0x9E => Self::SharedSubscriptionNotSupported, - 0x9F => Self::ConnectionRateExceeded, - 0xA0 => Self::MaximumConnectTime, - 0xA1 => Self::SubscriptionIdentifiersNotSupported, - 0xA2 => Self::WildcardSubscriptionsNotSupported, - other => return Err(Error::InvalidConnectReturnCode(other)), - }; + let mut length = 0; - Ok(rc) + if let Some(properties) = &properties { + length += 1; // Disconnect Reason Code + + let properties_len = properties::len(properties); + let properties_len_len = len_len(properties_len); + length += properties_len_len + properties_len; + } else { + length += 1; } + + length } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DisconnectProperties { - /// Session Expiry Interval in seconds - pub session_expiry_interval: Option, +pub fn read( + fixed_header: FixedHeader, + mut bytes: Bytes, +) -> Result<(Disconnect, Option), Error> { + let packet_type = fixed_header.byte1 >> 4; + let flags = fixed_header.byte1 & 0b0000_1111; + + bytes.advance(fixed_header.fixed_header_len); + + if packet_type != PacketType::Disconnect as u8 { + return Err(Error::InvalidPacketType(packet_type)); + }; + + if flags != 0x00 { + return Err(Error::MalformedPacket); + }; + + if fixed_header.remaining_len == 0 { + return Ok(( + Disconnect { + reason_code: DisconnectReasonCode::NormalDisconnection, + }, + None, + )); + } - /// Human readable reason for the disconnect - pub reason_string: Option, + let reason_code = read_u8(&mut bytes)?; - /// List of user properties - pub user_properties: Vec<(String, String)>, + let disconnect = Disconnect { + reason_code: reason(reason_code)?, + }; + let properties = properties::read(&mut bytes)?; - /// String which can be used by the Client to identify another Server to use. - pub server_reference: Option, + Ok((disconnect, properties)) } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Disconnect { - /// Disconnect Reason Code - pub reason_code: DisconnectReasonCode, +pub fn write( + disconnect: &Disconnect, + properties: &Option, + buffer: &mut BytesMut, +) -> Result { + buffer.put_u8(0xE0); - /// Disconnect Properties - pub properties: Option, -} + let length = len(disconnect, properties); -impl DisconnectProperties { - pub fn new() -> Self { - Self { - session_expiry_interval: None, - reason_string: None, - user_properties: Vec::new(), - server_reference: None, - } + if length == 2 { + buffer.put_u8(0x00); + return Ok(length); } - fn len(&self) -> usize { + let len_len = write_remaining_length(buffer, length)?; + + buffer.put_u8(code(disconnect.reason_code)); + + if let Some(properties) = &properties { + properties::write(properties, buffer)?; + } else { + write_remaining_length(buffer, 0)?; + } + + Ok(1 + len_len + length) +} + +mod properties { + use super::*; + + pub fn len(properties: &DisconnectProperties) -> usize { let mut length = 0; - if self.session_expiry_interval.is_some() { + if properties.session_expiry_interval.is_some() { length += 1 + 4; } - if let Some(reason) = &self.reason_string { + if let Some(reason) = &properties.reason_string { length += 1 + 2 + reason.len(); } - for (key, value) in self.user_properties.iter() { + for (key, value) in properties.user_properties.iter() { length += 1 + 2 + key.len() + 2 + value.len(); } - if let Some(server_reference) = &self.server_reference { + if let Some(server_reference) = &properties.server_reference { length += 1 + 2 + server_reference.len(); } length } - pub fn extract(mut bytes: &mut Bytes) -> Result, Error> { + pub fn read(mut bytes: &mut Bytes) -> Result, Error> { let (properties_len_len, properties_len) = length(bytes.iter())?; bytes.advance(properties_len_len); @@ -212,7 +160,7 @@ impl DisconnectProperties { } } - let properties = Self { + let properties = DisconnectProperties { session_expiry_interval, reason_string, user_properties, @@ -222,27 +170,27 @@ impl DisconnectProperties { Ok(Some(properties)) } - fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> { - let length = self.len(); + pub fn write(properties: &DisconnectProperties, buffer: &mut BytesMut) -> Result<(), Error> { + let length = len(properties); write_remaining_length(buffer, length)?; - if let Some(session_expiry_interval) = self.session_expiry_interval { + if let Some(session_expiry_interval) = properties.session_expiry_interval { buffer.put_u8(PropertyType::SessionExpiryInterval as u8); buffer.put_u32(session_expiry_interval); } - if let Some(reason) = &self.reason_string { + if let Some(reason) = &properties.reason_string { buffer.put_u8(PropertyType::ReasonString as u8); write_mqtt_string(buffer, reason); } - for (key, value) in self.user_properties.iter() { + for (key, value) in properties.user_properties.iter() { buffer.put_u8(PropertyType::UserProperty as u8); write_mqtt_string(buffer, key); write_mqtt_string(buffer, value); } - if let Some(reference) = &self.server_reference { + if let Some(reference) = &properties.server_reference { buffer.put_u8(PropertyType::ServerReference as u8); write_mqtt_string(buffer, reference); } @@ -251,96 +199,11 @@ impl DisconnectProperties { } } -impl Disconnect { - pub fn new() -> Self { - Self { - reason_code: DisconnectReasonCode::NormalDisconnection, - properties: None, - } - } - - fn len(&self) -> usize { - if self.reason_code == DisconnectReasonCode::NormalDisconnection - && self.properties.is_none() - { - return 2; // Packet type + 0x00 - } - - let mut length = 0; - - if let Some(properties) = &self.properties { - length += 1; // Disconnect Reason Code - - let properties_len = properties.len(); - let properties_len_len = len_len(properties_len); - length += properties_len_len + properties_len; - } else { - length += 1; - } - - length - } - - pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result { - let packet_type = fixed_header.byte1 >> 4; - let flags = fixed_header.byte1 & 0b0000_1111; - - bytes.advance(fixed_header.fixed_header_len); - - if packet_type != PacketType::Disconnect as u8 { - return Err(Error::InvalidPacketType(packet_type)); - }; - - if flags != 0x00 { - return Err(Error::MalformedPacket); - }; - - if fixed_header.remaining_len == 0 { - return Ok(Self::new()); - } - - let reason_code = read_u8(&mut bytes)?; - - let disconnect = Self { - reason_code: reason_code.try_into()?, - properties: DisconnectProperties::extract(&mut bytes)?, - }; - - Ok(disconnect) - } - - pub fn write(&self, buffer: &mut BytesMut) -> Result { - buffer.put_u8(0xE0); - - let length = self.len(); - - if length == 2 { - buffer.put_u8(0x00); - return Ok(length); - } - - let len_len = write_remaining_length(buffer, length)?; - - buffer.put_u8(self.reason_code as u8); - - if let Some(properties) = &self.properties { - properties.write(buffer)?; - } else { - write_remaining_length(buffer, 0)?; - } - - Ok(1 + len_len + length) - } -} - #[cfg(test)] mod test { + use super::*; use bytes::BytesMut; - use super::parse_fixed_header; - - use super::{Disconnect, DisconnectProperties, DisconnectReasonCode}; - #[test] fn disconnect1_parsing_works() { let mut buffer = bytes::BytesMut::new(); @@ -348,13 +211,15 @@ mod test { 0xE0, // Packet type 0x00, // Remaining length ]; - let expected = Disconnect::new(); + let expected = Disconnect { + reason_code: DisconnectReasonCode::NormalDisconnection, + }; buffer.extend_from_slice(&packet_bytes[..]); let fixed_header = parse_fixed_header(buffer.iter()).unwrap(); let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze(); - let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap(); + let (disconnect, properties) = read(fixed_header, disconnect_bytes).unwrap(); assert_eq!(disconnect, expected); } @@ -362,18 +227,20 @@ mod test { #[test] fn disconnect1_encoding_works() { let mut buffer = BytesMut::new(); - let disconnect = Disconnect::new(); + let disconnect = Disconnect { + reason_code: DisconnectReasonCode::NormalDisconnection, + }; let expected = [ 0xE0, // Packet type 0x00, // Remaining length ]; - disconnect.write(&mut buffer).unwrap(); + write(&disconnect, &None, &mut buffer).unwrap(); assert_eq!(&buffer[..], &expected); } - fn sample2() -> Disconnect { + fn sample2() -> (Disconnect, Option) { let properties = DisconnectProperties { // TODO: change to 2137 xD session_expiry_interval: Some(1234), @@ -382,10 +249,12 @@ mod test { server_reference: Some("test".to_owned()), }; - Disconnect { - reason_code: DisconnectReasonCode::UnspecifiedError, - properties: Some(properties), - } + ( + Disconnect { + reason_code: DisconnectReasonCode::UnspecifiedError, + }, + Some(properties), + ) } fn sample_bytes2() -> Vec { @@ -412,7 +281,7 @@ mod test { let fixed_header = parse_fixed_header(buffer.iter()).unwrap(); let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze(); - let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap(); + let disconnect = read(fixed_header, disconnect_bytes).unwrap(); assert_eq!(disconnect, expected); } @@ -421,11 +290,81 @@ mod test { fn disconnect2_encoding_works() { let mut buffer = BytesMut::new(); - let disconnect = sample2(); + let (disconnect, properties) = sample2(); let expected = sample_bytes2(); - disconnect.write(&mut buffer).unwrap(); + write(&disconnect, &properties, &mut buffer).unwrap(); assert_eq!(&buffer[..], &expected); } } + +fn reason(code: u8) -> Result { + let v = match code { + 0x00 => DisconnectReasonCode::NormalDisconnection, + 0x04 => DisconnectReasonCode::DisconnectWithWillMessage, + 0x80 => DisconnectReasonCode::UnspecifiedError, + 0x81 => DisconnectReasonCode::MalformedPacket, + 0x82 => DisconnectReasonCode::ProtocolError, + 0x83 => DisconnectReasonCode::ImplementationSpecificError, + 0x87 => DisconnectReasonCode::NotAuthorized, + 0x89 => DisconnectReasonCode::ServerBusy, + 0x8B => DisconnectReasonCode::ServerShuttingDown, + 0x8D => DisconnectReasonCode::KeepAliveTimeout, + 0x8E => DisconnectReasonCode::SessionTakenOver, + 0x8F => DisconnectReasonCode::TopicFilterInvalid, + 0x90 => DisconnectReasonCode::TopicNameInvalid, + 0x93 => DisconnectReasonCode::ReceiveMaximumExceeded, + 0x94 => DisconnectReasonCode::TopicAliasInvalid, + 0x95 => DisconnectReasonCode::PacketTooLarge, + 0x96 => DisconnectReasonCode::MessageRateTooHigh, + 0x97 => DisconnectReasonCode::QuotaExceeded, + 0x98 => DisconnectReasonCode::AdministrativeAction, + 0x99 => DisconnectReasonCode::PayloadFormatInvalid, + 0x9A => DisconnectReasonCode::RetainNotSupported, + 0x9B => DisconnectReasonCode::QoSNotSupported, + 0x9C => DisconnectReasonCode::UseAnotherServer, + 0x9D => DisconnectReasonCode::ServerMoved, + 0x9E => DisconnectReasonCode::SharedSubscriptionNotSupported, + 0x9F => DisconnectReasonCode::ConnectionRateExceeded, + 0xA0 => DisconnectReasonCode::MaximumConnectTime, + 0xA1 => DisconnectReasonCode::SubscriptionIdentifiersNotSupported, + 0xA2 => DisconnectReasonCode::WildcardSubscriptionsNotSupported, + other => return Err(Error::InvalidConnectReturnCode(other)), + }; + Ok(v) +} + +fn code(reason: DisconnectReasonCode) -> u8 { + match reason { + DisconnectReasonCode::NormalDisconnection => 0x00, + DisconnectReasonCode::DisconnectWithWillMessage => 0x04, + DisconnectReasonCode::UnspecifiedError => 0x80, + DisconnectReasonCode::MalformedPacket => 0x81, + DisconnectReasonCode::ProtocolError => 0x82, + DisconnectReasonCode::ImplementationSpecificError => 0x83, + DisconnectReasonCode::NotAuthorized => 0x87, + DisconnectReasonCode::ServerBusy => 0x89, + DisconnectReasonCode::ServerShuttingDown => 0x8B, + DisconnectReasonCode::KeepAliveTimeout => 0x8D, + DisconnectReasonCode::SessionTakenOver => 0x8E, + DisconnectReasonCode::TopicFilterInvalid => 0x8F, + DisconnectReasonCode::TopicNameInvalid => 0x90, + DisconnectReasonCode::ReceiveMaximumExceeded => 0x93, + DisconnectReasonCode::TopicAliasInvalid => 0x94, + DisconnectReasonCode::PacketTooLarge => 0x95, + DisconnectReasonCode::MessageRateTooHigh => 0x96, + DisconnectReasonCode::QuotaExceeded => 0x97, + DisconnectReasonCode::AdministrativeAction => 0x98, + DisconnectReasonCode::PayloadFormatInvalid => 0x99, + DisconnectReasonCode::RetainNotSupported => 0x9A, + DisconnectReasonCode::QoSNotSupported => 0x9B, + DisconnectReasonCode::UseAnotherServer => 0x9C, + DisconnectReasonCode::ServerMoved => 0x9D, + DisconnectReasonCode::SharedSubscriptionNotSupported => 0x9E, + DisconnectReasonCode::ConnectionRateExceeded => 0x9F, + DisconnectReasonCode::MaximumConnectTime => 0xA0, + DisconnectReasonCode::SubscriptionIdentifiersNotSupported => 0xA1, + DisconnectReasonCode::WildcardSubscriptionsNotSupported => 0xA2, + } +} diff --git a/rumqttd/src/protocol/v5/mod.rs b/rumqttd/src/protocol/v5/mod.rs index 5b2df49c7..94f4788bd 100644 --- a/rumqttd/src/protocol/v5/mod.rs +++ b/rumqttd/src/protocol/v5/mod.rs @@ -371,7 +371,12 @@ impl Protocol for V5 { return match packet_type { PacketType::PingReq => Ok(Packet::PingReq(PingReq)), PacketType::PingResp => Ok(Packet::PingResp(PingResp)), - PacketType::Disconnect => Ok(Packet::Disconnect), + PacketType::Disconnect => Ok(Packet::Disconnect( + Disconnect { + reason_code: DisconnectReasonCode::NormalDisconnection, + }, + None, + )), _ => Err(Error::PayloadRequired), }; } @@ -401,7 +406,10 @@ impl Protocol for V5 { } PacketType::PingReq => Packet::PingReq(PingReq), PacketType::PingResp => Packet::PingResp(PingResp), - PacketType::Disconnect => Packet::Disconnect, + PacketType::Disconnect => { + let (disconnect, properties) = disconnect::read(fixed_header, packet)?; + Packet::Disconnect(disconnect, properties) + } _ => unreachable!(), }; diff --git a/rumqttd/src/router/routing.rs b/rumqttd/src/router/routing.rs index abe9a53a5..cc3c9b804 100644 --- a/rumqttd/src/router/routing.rs +++ b/rumqttd/src/router/routing.rs @@ -522,7 +522,7 @@ impl Router { ackslog.suback(suback); force_ack = true; } - Packet::Unsubscribe(unsubscribe) => { + Packet::Unsubscribe(unsubscribe, _) => { debug!( "{:11} {:14} Id = {} Filters = {:?}", "data", "unsubscribe", id, unsubscribe.filters @@ -651,7 +651,7 @@ impl Router { force_ack = true; } - Packet::Disconnect => { + Packet::Disconnect(_, _) => { disconnect = true; execute_will = false; break;