Skip to content

Commit

Permalink
rumqttc: v5: move shared buffer to separate struct
Browse files Browse the repository at this point in the history
Signed-off-by: Abhik Jain <abhikjain360@gmail.com>
  • Loading branch information
abhikjain360 committed Apr 1, 2022
1 parent 8fffc5d commit 7792a5e
Show file tree
Hide file tree
Showing 6 changed files with 549 additions and 402 deletions.
208 changes: 129 additions & 79 deletions rumqttc/src/v5/client/asyncclient.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};

use crate::v5::{
client::get_ack_req,
outgoing_buf::OutgoingBuf,
packet::{Publish, Subscribe, SubscribeFilter, Unsubscribe},
ClientError, EventLoop, MqttOptions, QoS, Request,
};
Expand All @@ -16,26 +14,19 @@ use crate::v5::{
/// This is cloneable and can be used to asynchronously Publish, Subscribe.
#[derive(Debug)]
pub struct AsyncClient {
pub(crate) outgoing_buf: Arc<Mutex<VecDeque<Request>>>,
pub(crate) outgoing_buf_capacity: usize,
pub(crate) pkid_counter: u16,
pub(crate) max_inflight: u16,
pub(crate) outgoing_buf: Arc<Mutex<OutgoingBuf>>,
pub(crate) request_tx: Sender<()>,
}

impl AsyncClient {
/// Create a new `AsyncClient`
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let outgoing_buf = eventloop.request_buf().clone();
let outgoing_buf = eventloop.state.outgoing_buf.clone();
let request_tx = eventloop.handle();
let max_inflight = eventloop.state.max_inflight;

let client = AsyncClient {
outgoing_buf,
outgoing_buf_capacity: cap,
pkid_counter: 0,
max_inflight,
request_tx,
};

Expand All @@ -57,13 +48,18 @@ impl AsyncClient {
let mut publish = Publish::new(topic, qos, payload);
publish.retain = retain;
let pkid = if qos != QoS::AtMostOnce {
self.increment_pkid()
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
publish.pkid = pkid;
request_buf.buf.push_back(Request::Publish(publish));
pkid
} else {
0
};
publish.pkid = pkid;
self.push_and_async_notify(Request::Publish(publish))
.await?;
self.notify_async().await?;
Ok(pkid)
}

Expand All @@ -82,27 +78,43 @@ impl AsyncClient {
let mut publish = Publish::new(topic, qos, payload);
publish.retain = retain;
let pkid = if qos != QoS::AtMostOnce {
self.increment_pkid()
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
publish.pkid = pkid;
request_buf.buf.push_back(Request::Publish(publish));
pkid
} else {
0
};
publish.pkid = pkid;
self.push_and_try_notify(Request::Publish(publish))?;
self.try_notify()?;
Ok(pkid)
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
pub async fn ack(&mut self, publish: &Publish) -> Result<(), ClientError> {
if let Some(ack) = get_ack_req(publish.qos, publish.pkid) {
self.push_and_async_notify(ack).await?;
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
request_buf.buf.push_back(ack);
self.notify_async().await?;
}
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&mut self, publish: &Publish) -> Result<(), ClientError> {
if let Some(ack) = get_ack_req(publish.qos, publish.pkid) {
self.push_and_try_notify(ack)?;
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
request_buf.buf.push_back(ack);
self.try_notify()?;
}
Ok(())
}
Expand All @@ -121,13 +133,18 @@ impl AsyncClient {
let mut publish = Publish::from_bytes(topic, qos, payload);
publish.retain = retain;
let pkid = if qos != QoS::AtMostOnce {
self.increment_pkid()
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
publish.pkid = pkid;
request_buf.buf.push_back(Request::Publish(publish));
pkid
} else {
0
};
publish.pkid = pkid;
self.push_and_async_notify(Request::Publish(publish))
.await?;
self.notify_async().await?;
Ok(pkid)
}

Expand All @@ -138,10 +155,17 @@ impl AsyncClient {
qos: QoS,
) -> Result<u16, ClientError> {
let mut subscribe = Subscribe::new(topic.into(), qos);
let pkid = self.increment_pkid();
subscribe.pkid = pkid;
self.push_and_async_notify(Request::Subscribe(subscribe))
.await?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
subscribe.pkid = pkid;
request_buf.buf.push_back(Request::Subscribe(subscribe));
pkid
};
self.notify_async().await?;
Ok(pkid)
}

Expand All @@ -152,9 +176,17 @@ impl AsyncClient {
qos: QoS,
) -> Result<u16, ClientError> {
let mut subscribe = Subscribe::new(topic.into(), qos);
let pkid = self.increment_pkid();
subscribe.pkid = pkid;
self.push_and_try_notify(Request::Subscribe(subscribe))?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
subscribe.pkid = pkid;
request_buf.buf.push_back(Request::Subscribe(subscribe));
pkid
};
self.try_notify()?;
Ok(pkid)
}

Expand All @@ -164,10 +196,17 @@ impl AsyncClient {
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics);
let pkid = self.increment_pkid();
subscribe.pkid = pkid;
self.push_and_async_notify(Request::Subscribe(subscribe))
.await?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
subscribe.pkid = pkid;
request_buf.buf.push_back(Request::Subscribe(subscribe));
pkid
};
self.notify_async().await?;
Ok(pkid)
}

Expand All @@ -177,86 +216,97 @@ impl AsyncClient {
T: IntoIterator<Item = SubscribeFilter>,
{
let mut subscribe = Subscribe::new_many(topics);
let pkid = self.increment_pkid();
subscribe.pkid = pkid;
self.push_and_try_notify(Request::Subscribe(subscribe))?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
subscribe.pkid = pkid;
request_buf.buf.push_back(Request::Subscribe(subscribe));
pkid
};
self.try_notify()?;
Ok(pkid)
}

/// Sends a MQTT Unsubscribe to the eventloop
pub async fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<u16, ClientError> {
let mut unsubscribe = Unsubscribe::new(topic.into());
let pkid = self.increment_pkid();
unsubscribe.pkid = pkid;
self.push_and_async_notify(Request::Unsubscribe(unsubscribe))
.await?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
unsubscribe.pkid = pkid;
request_buf.buf.push_back(Request::Unsubscribe(unsubscribe));
pkid
};
self.notify_async().await?;
Ok(pkid)
}

/// Sends a MQTT Unsubscribe to the eventloop
pub fn try_unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<u16, ClientError> {
let mut unsubscribe = Unsubscribe::new(topic.into());
let pkid = self.increment_pkid();
unsubscribe.pkid = pkid;
self.push_and_try_notify(Request::Unsubscribe(unsubscribe))?;
let pkid = {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
let pkid = request_buf.increment_pkid();
unsubscribe.pkid = pkid;
request_buf.buf.push_back(Request::Unsubscribe(unsubscribe));
pkid
};
self.try_notify()?;
Ok(pkid)
}

/// Sends a MQTT disconnect to the eventloop
#[inline]
pub async fn disconnect(&mut self) -> Result<(), ClientError> {
self.push_and_async_notify(Request::Disconnect).await
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
request_buf.buf.push_back(Request::Disconnect);
self.notify_async().await
}

/// Sends a MQTT disconnect to the eventloop
#[inline]
pub fn try_disconnect(&mut self) -> Result<(), ClientError> {
self.push_and_try_notify(Request::Disconnect)
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.buf.len() == request_buf.capacity {
return Err(ClientError::RequestsFull);
}
request_buf.buf.push_back(Request::Disconnect);
self.try_notify()
}

async fn push_and_async_notify(&self, request: Request) -> Result<(), ClientError> {
{
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.len() == self.outgoing_buf_capacity {
return Err(ClientError::RequestsFull);
}
request_buf.push_back(request);
}
#[inline]
async fn notify_async(&self) -> Result<(), ClientError> {
if let Err(SendError(_)) = self.request_tx.send_async(()).await {
return Err(ClientError::EventloopClosed);
};
Ok(())
}

pub(crate) fn push_and_notify(&self, request: Request) -> Result<(), ClientError> {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.len() == self.outgoing_buf_capacity {
return Err(ClientError::RequestsFull);
}
request_buf.push_back(request);
#[inline]
pub(crate) fn notify(&self) -> Result<(), ClientError> {
if let Err(SendError(_)) = self.request_tx.send(()) {
return Err(ClientError::EventloopClosed);
};
Ok(())
}

fn push_and_try_notify(&self, request: Request) -> Result<(), ClientError> {
let mut request_buf = self.outgoing_buf.lock().unwrap();
if request_buf.len() == self.outgoing_buf_capacity {
return Err(ClientError::RequestsFull);
}
request_buf.push_back(request);
#[inline]
fn try_notify(&self) -> Result<(), ClientError> {
if let Err(TrySendError::Disconnected(_)) = self.request_tx.try_send(()) {
return Err(ClientError::EventloopClosed);
}
Ok(())
}

#[inline]
pub(crate) fn increment_pkid(&mut self) -> u16 {
self.pkid_counter = if self.pkid_counter == self.max_inflight {
1
} else {
self.pkid_counter + 1
};
self.pkid_counter
}
}

0 comments on commit 7792a5e

Please sign in to comment.