Skip to content

Commit

Permalink
feat: add subscribe/unsubscribe properties
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed May 19, 2022
1 parent 18c56c1 commit 3c13d8b
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion rumqttc/src/v5/client/asyncclient.rs
Expand Up @@ -7,7 +7,7 @@ use crate::v5::{
client::get_ack_req,
outgoing_buf::OutgoingBuf,
packet::{Publish, Subscribe, SubscribeFilter, Unsubscribe},
ClientError, EventLoop, MqttOptions, QoS, Request,
ClientError, EventLoop, MqttOptions, QoS, Request, SubscribeProperties, UnsubscribeProperties,
};

/// `AsyncClient` to communicate with MQTT `Eventloop`
Expand Down Expand Up @@ -152,7 +152,18 @@ impl AsyncClient {

/// Sends a MQTT Subscribe to the eventloop
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<u16, ClientError> {
self.subscribe_with(topic, qos, None).await
}

/// Sends a MQTT Subscribe to the eventloop with options
pub async fn subscribe_with<S: Into<String>, P: Into<Option<SubscribeProperties>>>(
&self,
topic: S,
qos: QoS,
props: P,
) -> Result<u16, ClientError> {
let mut subscribe = Subscribe::new(topic.into(), qos);
subscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -169,7 +180,18 @@ impl AsyncClient {

/// Sends a MQTT Subscribe to the eventloop
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<u16, ClientError> {
self.try_subscribe_with(topic, qos, None)
}

/// Sends a MQTT Subscribe to the eventloop
pub fn try_subscribe_with<S: Into<String>, P: Into<Option<SubscribeProperties>>>(
&self,
topic: S,
qos: QoS,
props: P,
) -> Result<u16, ClientError> {
let mut subscribe = Subscribe::new(topic.into(), qos);
subscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -186,10 +208,23 @@ impl AsyncClient {

/// Sends a MQTT Subscribe for multiple topics to the eventloop
pub async fn subscribe_many<T>(&self, topics: T) -> Result<u16, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
self.subscribe_with_many(None, topics).await
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop, with properties
pub async fn subscribe_with_many<P: Into<Option<SubscribeProperties>>, T>(
&self,
props: P,
topics: T,
) -> Result<u16, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics)?;
subscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -208,8 +243,18 @@ impl AsyncClient {
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<u16, ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
self.try_subscribe_with_many(None, topics)
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
pub fn try_subscribe_with_many<P, T>(&self, props: P, topics: T) -> Result<u16, ClientError>
where
P: Into<Option<SubscribeProperties>>,
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics)?;
subscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -226,7 +271,17 @@ impl AsyncClient {

/// Sends a MQTT Unsubscribe to the eventloop
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<u16, ClientError> {
self.unsubscribe_with(topic, None).await
}

/// Sends a MQTT Unsubscribe to the eventloop, with properties
pub async fn unsubscribe_with<S: Into<String>, P: Into<Option<UnsubscribeProperties>>>(
&self,
topic: S,
props: P,
) -> Result<u16, ClientError> {
let mut unsubscribe = Unsubscribe::new(topic.into());
unsubscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand All @@ -243,7 +298,17 @@ impl AsyncClient {

/// Sends a MQTT Unsubscribe to the eventloop
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<u16, ClientError> {
self.try_unsubscribe_with(topic, None)
}

/// Sends a MQTT Unsubscribe to the eventloop, with properties
pub fn try_unsubscribe_with<S: Into<String>, P: Into<Option<UnsubscribeProperties>>>(
&self,
topic: S,
props: P,
) -> Result<u16, ClientError> {
let mut unsubscribe = Unsubscribe::new(topic.into());
unsubscribe.properties = props.into();
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
Expand Down

0 comments on commit 3c13d8b

Please sign in to comment.