diff --git a/rumqttc/src/v5/client/asyncclient.rs b/rumqttc/src/v5/client/asyncclient.rs index bd3d51ac8..565a5702c 100644 --- a/rumqttc/src/v5/client/asyncclient.rs +++ b/rumqttc/src/v5/client/asyncclient.rs @@ -7,7 +7,7 @@ use crate::v5::{ client::get_ack_req, outgoing_buf::OutgoingBuf, packet::{Publish, Subscribe, SubscribeFilter, Unsubscribe}, - ClientError, EventLoop, MqttOptions, QoS, Request, + ClientError, EventLoop, MqttOptions, QoS, Request, SubscribeProperties, UnsubscribeProperties, }; /// `AsyncClient` to communicate with MQTT `Eventloop` @@ -152,7 +152,18 @@ impl AsyncClient { /// Sends a MQTT Subscribe to the eventloop pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result { + self.subscribe_with(topic, qos, None).await + } + + /// Sends a MQTT Subscribe to the eventloop with options + pub async fn subscribe_with, P: Into>>( + &self, + topic: S, + qos: QoS, + props: P, + ) -> Result { let mut subscribe = Subscribe::new(topic.into(), qos); + subscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity { @@ -169,7 +180,18 @@ impl AsyncClient { /// Sends a MQTT Subscribe to the eventloop pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result { + self.try_subscribe_with(topic, qos, None) + } + + /// Sends a MQTT Subscribe to the eventloop + pub fn try_subscribe_with, P: Into>>( + &self, + topic: S, + qos: QoS, + props: P, + ) -> Result { let mut subscribe = Subscribe::new(topic.into(), qos); + subscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity { @@ -186,10 +208,23 @@ impl AsyncClient { /// Sends a MQTT Subscribe for multiple topics to the eventloop pub async fn subscribe_many(&self, topics: T) -> Result + where + T: IntoIterator, + { + self.subscribe_with_many(None, topics).await + } + + /// Sends a MQTT Subscribe for multiple topics to the eventloop, with properties + pub async fn subscribe_with_many>, T>( + &self, + props: P, + topics: T, + ) -> Result where T: IntoIterator, { let mut subscribe = Subscribe::new_many(topics)?; + subscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity { @@ -208,8 +243,18 @@ impl AsyncClient { pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, + { + self.try_subscribe_with_many(None, topics) + } + + /// Sends a MQTT Subscribe for multiple topics to the eventloop + pub fn try_subscribe_with_many(&self, props: P, topics: T) -> Result + where + P: Into>, + T: IntoIterator, { let mut subscribe = Subscribe::new_many(topics)?; + subscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity { @@ -226,7 +271,17 @@ impl AsyncClient { /// Sends a MQTT Unsubscribe to the eventloop pub async fn unsubscribe>(&self, topic: S) -> Result { + self.unsubscribe_with(topic, None).await + } + + /// Sends a MQTT Unsubscribe to the eventloop, with properties + pub async fn unsubscribe_with, P: Into>>( + &self, + topic: S, + props: P, + ) -> Result { let mut unsubscribe = Unsubscribe::new(topic.into()); + unsubscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity { @@ -243,7 +298,17 @@ impl AsyncClient { /// Sends a MQTT Unsubscribe to the eventloop pub fn try_unsubscribe>(&self, topic: S) -> Result { + self.try_unsubscribe_with(topic, None) + } + + /// Sends a MQTT Unsubscribe to the eventloop, with properties + pub fn try_unsubscribe_with, P: Into>>( + &self, + topic: S, + props: P, + ) -> Result { let mut unsubscribe = Unsubscribe::new(topic.into()); + unsubscribe.properties = props.into(); let pkid = { let mut request_buf = self.outgoing_buf.lock().unwrap(); if request_buf.buf.len() == request_buf.capacity {