Skip to content

Commit

Permalink
fix(protocol): Extract out Disconnect packet logic similar to other p…
Browse files Browse the repository at this point in the history
…ackets (bytebeamio#480)
  • Loading branch information
henil committed Oct 8, 2022
1 parent 32cddf4 commit de035e2
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 318 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
### Unreleased

- Removed unused dependencies and updated version of some of used libraries to fix dependabots warning (#475)
- (rumqttd) Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480)

### R17
---
Expand Down
157 changes: 90 additions & 67 deletions rumqttd/src/protocol/mod.rs
Expand Up @@ -27,7 +27,7 @@ pub enum Packet {
Option<LastWillProperties>,
Option<Login>,
),
ConnAck(ConnAck),
ConnAck(ConnAck, Option<ConnAckProperties>),
Publish(Publish, Option<PublishProperties>),
PubAck(PubAck, Option<PubAckProperties>),
PingReq(PingReq),
Expand All @@ -37,9 +37,9 @@ pub enum Packet {
PubRec(PubRec, Option<PubRecProperties>),
PubRel(PubRel, Option<PubRelProperties>),
PubComp(PubComp, Option<PubCompProperties>),
Unsubscribe(Unsubscribe),
UnsubAck(UnsubAck),
Disconnect,
Unsubscribe(Unsubscribe, Option<UnsubscribeProperties>),
UnsubAck(UnsubAck, Option<UnsubAckProperties>),
Disconnect(Disconnect, Option<DisconnectProperties>),
}

//--------------------------- Connect packet -------------------------------
Expand Down Expand Up @@ -381,69 +381,6 @@ pub struct UnsubAckProperties {
pub user_properties: Vec<(String, String)>,
}

//--------------------------- Disconnect packet -------------------------------
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DisconnectReasonCode {
/// Close the connection normally. Do not send the Will Message.
NormalDisconnection,
/// The Client wishes to disconnect but requires that the Server also publishes its Will Message.
DisconnectWithWillMessage,
/// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply.
UnspecifiedError,
/// The received packet does not conform to this specification.
MalformedPacket,
/// An unexpected or out of order packet was received.
ProtocolError,
/// The packet received is valid but cannot be processed by this implementation.
ImplementationSpecificError,
/// The request is not authorized.
NotAuthorized,
/// The Server is busy and cannot continue processing requests from this Client.
ServerBusy,
/// The Server is shutting down.
ServerShuttingDown,
/// The Connection is closed because no packet has been received for 1.5 times the Keepalive time.
KeepAliveTimeout,
/// Another Connection using the same ClientID has connected causing this Connection to be closed.
SessionTakenOver,
/// The Topic Filter is correctly formed, but is not accepted by this Sever.
TopicFilterInvalid,
/// The Topic Name is correctly formed, but is not accepted by this Client or Server.
TopicNameInvalid,
/// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP.
ReceiveMaximumExceeded,
/// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.
TopicAliasInvalid,
/// The packet size is greater than Maximum Packet Size for this Client or Server.
PacketTooLarge,
/// The received data rate is too high.
MessageRateTooHigh,
/// An implementation or administrative imposed limit has been exceeded.
QuotaExceeded,
/// The Connection is closed due to an administrative action.
AdministrativeAction,
/// The payload format does not match the one specified by the Payload Format Indicator.
PayloadFormatInvalid,
/// The Server has does not support retained messages.
RetainNotSupported,
/// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK.
QoSNotSupported,
/// The Client should temporarily change its Server.
UseAnotherServer,
/// The Server is moved and the Client should permanently change its server location.
ServerMoved,
/// The Server does not support Shared Subscriptions.
SharedSubscriptionNotSupported,
/// This connection is closed because the connection rate is too high.
ConnectionRateExceeded,
/// The maximum connection time authorized for this connection has been exceeded.
MaximumConnectTime,
/// The Server does not support Subscription Identifiers; the subscription is not accepted.
SubscriptionIdentifiersNotSupported,
/// The Server does not support Wildcard subscription; the subscription is not accepted.
WildcardSubscriptionsNotSupported,
}

//--------------------------- Ping packet -------------------------------

struct Ping;
Expand Down Expand Up @@ -533,6 +470,92 @@ pub struct PubRelProperties {

//------------------------------------------------------------------------

//--------------------------- Disconnect packet -------------------------------
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Disconnect {
/// Disconnect Reason Code
pub reason_code: DisconnectReasonCode,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum DisconnectReasonCode {
/// Close the connection normally. Do not send the Will Message.
NormalDisconnection,
/// The Client wishes to disconnect but requires that the Server also publishes its Will Message.
DisconnectWithWillMessage,
/// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply.
UnspecifiedError,
/// The received packet does not conform to this specification.
MalformedPacket,
/// An unexpected or out of order packet was received.
ProtocolError,
/// The packet received is valid but cannot be processed by this implementation.
ImplementationSpecificError,
/// The request is not authorized.
NotAuthorized,
/// The Server is busy and cannot continue processing requests from this Client.
ServerBusy,
/// The Server is shutting down.
ServerShuttingDown,
/// The Connection is closed because no packet has been received for 1.5 times the Keepalive time.
KeepAliveTimeout,
/// Another Connection using the same ClientID has connected causing this Connection to be closed.
SessionTakenOver,
/// The Topic Filter is correctly formed, but is not accepted by this Sever.
TopicFilterInvalid,
/// The Topic Name is correctly formed, but is not accepted by this Client or Server.
TopicNameInvalid,
/// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP.
ReceiveMaximumExceeded,
/// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.
TopicAliasInvalid,
/// The packet size is greater than Maximum Packet Size for this Client or Server.
PacketTooLarge,
/// The received data rate is too high.
MessageRateTooHigh,
/// An implementation or administrative imposed limit has been exceeded.
QuotaExceeded,
/// The Connection is closed due to an administrative action.
AdministrativeAction,
/// The payload format does not match the one specified by the Payload Format Indicator.
PayloadFormatInvalid,
/// The Server has does not support retained messages.
RetainNotSupported,
/// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK.
QoSNotSupported,
/// The Client should temporarily change its Server.
UseAnotherServer,
/// The Server is moved and the Client should permanently change its server location.
ServerMoved,
/// The Server does not support Shared Subscriptions.
SharedSubscriptionNotSupported,
/// This connection is closed because the connection rate is too high.
ConnectionRateExceeded,
/// The maximum connection time authorized for this connection has been exceeded.
MaximumConnectTime,
/// The Server does not support Subscription Identifiers; the subscription is not accepted.
SubscriptionIdentifiersNotSupported,
/// The Server does not support Wildcard subscription; the subscription is not accepted.
WildcardSubscriptionsNotSupported,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DisconnectProperties {
/// Session Expiry Interval in seconds
pub session_expiry_interval: Option<u32>,

/// Human readable reason for the disconnect
pub reason_string: Option<String>,

/// List of user properties
pub user_properties: Vec<(String, String)>,

/// String which can be used by the Client to identify another Server to use.
pub server_reference: Option<String>,
}
//------------------------------------------------------------------------

/// Quality of service
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
Expand Down
11 changes: 4 additions & 7 deletions rumqttd/src/protocol/v4/disconnect.rs
@@ -1,11 +1,8 @@
use super::*;
use bytes::{BufMut, BytesMut};

pub struct Disconnect;

impl Disconnect {
pub fn write(&self, payload: &mut BytesMut) -> Result<usize, Error> {
payload.put_slice(&[0xE0, 0x00]);
Ok(2)
}
// In v4 there are no Reason Code and properties
pub fn write(_disconnect: &Disconnect, payload: &mut BytesMut) -> Result<usize, Error> {
payload.put_slice(&[0xE0, 0x00]);
Ok(2)
}
17 changes: 12 additions & 5 deletions rumqttd/src/protocol/v4/mod.rs
Expand Up @@ -299,7 +299,12 @@ impl Protocol for V4 {
return match packet_type {
PacketType::PingReq => Ok(Packet::PingReq(PingReq)),
PacketType::PingResp => Ok(Packet::PingResp(PingResp)),
PacketType::Disconnect => Ok(Packet::Disconnect),
PacketType::Disconnect => Ok(Packet::Disconnect(
Disconnect {
reason_code: DisconnectReasonCode::NormalDisconnection,
},
None,
)),
_ => Err(Error::PayloadRequired),
};
}
Expand All @@ -310,23 +315,25 @@ impl Protocol for V4 {
let (connect, login, lastwill) = connect::read(fixed_header, packet)?;
Packet::Connect(connect, None, lastwill, None, login)
}
PacketType::ConnAck => Packet::ConnAck(connack::read(fixed_header, packet)?),
PacketType::ConnAck => Packet::ConnAck(connack::read(fixed_header, packet)?, None),
PacketType::Publish => Packet::Publish(publish::read(fixed_header, packet)?, None),
PacketType::PubAck => Packet::PubAck(puback::read(fixed_header, packet)?, None),
PacketType::Subscribe => {
Packet::Subscribe(subscribe::read(fixed_header, packet)?, None)
}
PacketType::SubAck => Packet::SubAck(suback::read(fixed_header, packet)?, None),
PacketType::Unsubscribe => {
Packet::Unsubscribe(unsubscribe::read(fixed_header, packet)?)
Packet::Unsubscribe(unsubscribe::read(fixed_header, packet)?, None)
}
PacketType::UnsubAck => Packet::UnsubAck(unsuback::read(fixed_header, packet)?),
PacketType::UnsubAck => Packet::UnsubAck(unsuback::read(fixed_header, packet)?, None),
PacketType::PingReq => Packet::PingReq(PingReq),
PacketType::PingResp => Packet::PingResp(PingResp),
PacketType::PubRec => Packet::PubRec(pubrec::read(fixed_header, packet)?, None),
PacketType::PubRel => Packet::PubRel(pubrel::read(fixed_header, packet)?, None),
PacketType::PubComp => Packet::PubComp(pubcomp::read(fixed_header, packet)?, None),
PacketType::Disconnect => Packet::Disconnect,
// v4 Disconnect packet gets handled in the previous check, this branch gets hit when
// Disconnect packet has properties which is only valid for v5
PacketType::Disconnect => return Err(Error::InvalidProtocol),
_ => unreachable!(),
};

Expand Down

0 comments on commit de035e2

Please sign in to comment.