Skip to content

Commit

Permalink
Merge pull request #392 from bytebeamio/sub
Browse files Browse the repository at this point in the history
Handle subscription with empty filter
  • Loading branch information
de-sh committed May 13, 2022
2 parents 7a1bf06 + 62ac4d0 commit 399c62b
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 53 deletions.
4 changes: 2 additions & 2 deletions rumqttc/src/client.rs
Expand Up @@ -150,7 +150,7 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let subscribe = Subscribe::new_many(topics)?;
let request = Request::Subscribe(subscribe);
self.request_tx.send(request).await?;
Ok(())
Expand All @@ -161,7 +161,7 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let subscribe = Subscribe::new_many(topics)?;
let request = Request::Subscribe(subscribe);
self.request_tx.try_send(request)?;
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions rumqttc/src/mqttbytes/mod.rs
Expand Up @@ -53,6 +53,8 @@ pub enum Error {
MalformedPacket,
#[error("Malformed remaining length")]
MalformedRemainingLength,
#[error("A Subscribe packet must contain atleast one filter")]
EmptySubscription,
/// More bytes required to frame packet. Argument
/// implies minimum additional bytes required to
/// proceed further
Expand Down
22 changes: 9 additions & 13 deletions rumqttc/src/mqttbytes/v4/subscribe.rs
Expand Up @@ -21,20 +21,15 @@ impl Subscribe {
}
}

pub fn new_many<T>(topics: T) -> Subscribe
pub fn new_many<T>(topics: T) -> Result<Subscribe, Error>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Subscribe {
pkid: 0,
filters: topics.into_iter().collect(),
}
}
let filters: Vec<SubscribeFilter> = topics.into_iter().collect();

pub fn empty_subscribe() -> Subscribe {
Subscribe {
pkid: 0,
filters: Vec::new(),
match filters.len() {
0 => Err(Error::EmptySubscription),
_ => Ok(Subscribe { pkid: 0, filters }),
}
}

Expand Down Expand Up @@ -70,9 +65,10 @@ impl Subscribe {
});
}

let subscribe = Subscribe { pkid, filters };

Ok(subscribe)
match filters.len() {
0 => Err(Error::EmptySubscription),
_ => Ok(Subscribe { pkid, filters }),
}
}

pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
Expand Down
6 changes: 6 additions & 0 deletions rumqttc/src/state.rs
Expand Up @@ -29,6 +29,8 @@ pub enum StateError {
WrongPacket,
#[error("Timeout while waiting to resolve collision")]
CollisionTimeout,
#[error("A Subscribe packet must contain atleast one filter")]
EmptySubscription,
#[error("Mqtt serialization/deserialization error: {0}")]
Deserialization(#[from] mqttbytes::Error),
}
Expand Down Expand Up @@ -412,6 +414,10 @@ impl MqttState {
}

fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
if subscription.filters.is_empty() {
return Err(StateError::EmptySubscription);
}

let pkid = self.next_pkid();
subscription.pkid = pkid;

Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/v5/client/asyncclient.rs
Expand Up @@ -189,7 +189,7 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics);
let mut subscribe = Subscribe::new_many(topics)?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -209,7 +209,7 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics);
let mut subscribe = Subscribe::new_many(topics)?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/client/mod.rs
Expand Up @@ -21,7 +21,7 @@ pub enum ClientError {
#[error("Failed to send mqtt request to evenloop, to requests buffer is full right now")]
RequestsFull,
#[error("Serialization error")]
Mqtt5(Error),
Mqtt5(#[from] Error),
}

fn get_ack_req(qos: QoS, pkid: u16) -> Option<Request> {
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/client/syncclient.rs
Expand Up @@ -119,7 +119,7 @@ impl Client {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics);
let mut subscribe = Subscribe::new_many(topics)?;
let pkid = {
let mut request_buf = self.client.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand Down
40 changes: 26 additions & 14 deletions rumqttc/src/v5/packet/mod.rs
@@ -1,7 +1,4 @@
use std::{
fmt::{self, Display, Formatter},
slice::Iter,
};
use std::slice::Iter;

use bytes::{Buf, BufMut, Bytes, BytesMut};

Expand Down Expand Up @@ -105,33 +102,54 @@ enum PropertyType {
}

/// Error during serialization and deserialization
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum Error {
#[error("Expected Connect, received: {0:?}")]
NotConnect(PacketType),
#[error("Unexpected Connect")]
UnexpectedConnect,
#[error("Invalid Connect return code: {0}")]
InvalidConnectReturnCode(u8),
InvalidReason(u8),
#[error("Invalid protocol")]
InvalidProtocol,
#[error("Invalid protocol level: {0}")]
InvalidProtocolLevel(u8),
#[error("Incorrect packet format")]
IncorrectPacketFormat,
#[error("Invalid packet type: {0}")]
InvalidPacketType(u8),
#[error("Invalid property type: {0}")]
InvalidPropertyType(u8),
InvalidRetainForwardRule(u8),
#[error("Invalid QoS level: {0}")]
InvalidQoS(u8),
#[error("Invalid retain forward rule: {0}")]
InvalidRetainForwardRule(u8),
#[error("Invalid subscribe reason code: {0}")]
InvalidSubscribeReasonCode(u8),
#[error("Packet id Zero")]
PacketIdZero,
SubscriptionIdZero,
#[error("Payload size is incorrect")]
PayloadSizeIncorrect,
#[error("payload is too long")]
PayloadTooLong,
#[error("payload size limit exceeded: {0}")]
PayloadSizeLimitExceeded(usize),
#[error("Payload required")]
PayloadRequired,
#[error("Topic is not UTF-8")]
TopicNotUtf8,
#[error("Promised boundary crossed: {0}")]
BoundaryCrossed(usize),
#[error("Malformed packet")]
MalformedPacket,
#[error("Malformed remaining length")]
MalformedRemainingLength,
#[error("A Subscribe packet must contain atleast one filter")]
EmptySubscription,
/// More bytes required to frame packet. Argument
/// implies minimum additional bytes required to
/// proceed further
#[error("At least {0} more bytes required to frame packet")]
InsufficientBytes(usize),
}

Expand Down Expand Up @@ -481,9 +499,3 @@ fn read_u32(stream: &mut Bytes) -> Result<u32, Error> {

Ok(stream.get_u32())
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Error = {:?}", self)
}
}
38 changes: 18 additions & 20 deletions rumqttc/src/v5/packet/subscribe.rs
Expand Up @@ -27,22 +27,19 @@ impl Subscribe {
}
}

pub fn new_many<T>(topics: T) -> Subscribe
pub fn new_many<T>(topics: T) -> Result<Subscribe, Error>
where
T: IntoIterator<Item = SubscribeFilter>,
{
Subscribe {
pkid: 0,
filters: topics.into_iter().collect(),
properties: None,
}
}

pub fn empty_subscribe() -> Subscribe {
Subscribe {
pkid: 0,
filters: Vec::new(),
properties: None,
let filters: Vec<SubscribeFilter> = topics.into_iter().collect();

match filters.len() {
0 => Err(Error::EmptySubscription),
_ => Ok(Subscribe {
pkid: 0,
filters,
properties: None,
}),
}
}

Expand Down Expand Up @@ -112,13 +109,14 @@ impl Subscribe {
});
}

let subscribe = Subscribe {
pkid,
filters,
properties,
};

Ok(subscribe)
match filters.len() {
0 => Err(Error::EmptySubscription),
_ => Ok(Subscribe {
pkid,
filters,
properties,
}),
}
}

pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
Expand Down

0 comments on commit 399c62b

Please sign in to comment.