Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rumqttc: ConnectionError::RequestsDone is unreachable #815

Open
flxo opened this issue Mar 8, 2024 · 3 comments
Open

rumqttc: ConnectionError::RequestsDone is unreachable #815

flxo opened this issue Mar 8, 2024 · 3 comments

Comments

@flxo
Copy link
Contributor

flxo commented Mar 8, 2024

The async v5 client implementation checks for errors when receiving from the request channel. The only error that can happen here (with flume) is RecvError (disconnected): An error that may be emitted when attempting to wait for a value on a receiver when all senders are dropped and there are no more messages in the channel..

EventLoop holds a sender handle to this channel. This error condition can never happen. The implementation is confusing.

During the construction of the AsyncClient the tx handle is cloned.

A easy fix could be to return the sending half from EventLoop::new (and update the visibility) and move it into the AsyncClient:

diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs
index 910da50..e2288a3 100644
--- a/rumqttc/src/v5/client.rs
+++ b/rumqttc/src/v5/client.rs
@@ -54,9 +54,7 @@ impl AsyncClient {
     ///
     /// `cap` specifies the capacity of the bounded async channel.
     pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
-        let eventloop = EventLoop::new(options, cap);
-        let request_tx = eventloop.requests_tx.clone();
-
+        let (request_tx, eventloop) = EventLoop::new(options, cap);
         let client = AsyncClient { request_tx };
 
         (client, eventloop)
diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs
index 36c1097..6210b0c 100644
--- a/rumqttc/src/v5/eventloop.rs
+++ b/rumqttc/src/v5/eventloop.rs
@@ -75,8 +75,6 @@ pub struct EventLoop {
     pub state: MqttState,
     /// Request stream
     requests_rx: Receiver<Request>,
-    /// Requests handle to send requests
-    pub(crate) requests_tx: Sender<Request>,
     /// Pending packets from last session
     pub pending: VecDeque<Request>,
     /// Network connection to the broker
@@ -97,21 +95,23 @@ impl EventLoop {
     ///
     /// When connection encounters critical errors (like auth failure), user has a choice to
     /// access and update `options`, `state` and `requests`.
-    pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
+    pub(crate) fn new(options: MqttOptions, cap: usize) -> (Sender<Request>, EventLoop) {
         let (requests_tx, requests_rx) = bounded(cap);
         let pending = VecDeque::new();
         let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
         let manual_acks = options.manual_acks;
 
-        EventLoop {
-            options,
-            state: MqttState::new(inflight_limit, manual_acks),
+        (
             requests_tx,
-            requests_rx,
-            pending,
-            network: None,
-            keepalive_timeout: None,
-        }
+            EventLoop {
+                options,
+                state: MqttState::new(inflight_limit, manual_acks),
+                requests_rx,
+                pending,
+                network: None,
+                keepalive_timeout: None,
+            },
+        )
     }
 
     /// Last session might contain packets which aren't acked. MQTT says these packets should be
@de-sh
Copy link
Member

de-sh commented Mar 13, 2024

Going by how other libraries handle this, I think we should deprecate Async/Client::new(opt, cap) and just have an EventLoop::new(opt, cap) -> (AsyncClient, EventLoop) and EventLoop::new_sync(opt, cap) -> (Client, EventLoop)

@flxo
Copy link
Contributor Author

flxo commented Mar 15, 2024

From an even higher perspective I don't get why there's a separation between AsyncClient and EventLoop. Maybe a relict from the sync versions and the intention is to keep them close to each other.

Why not:

AsyncClient::new(opt) -> AsyncClient;
impl Stream<Item = Result<Event, ConnectionError> for AsyncClient { ... }
impl Sink<Request, Error = ConnectionError> for AsyncClient { ... }

I see the following advantages:

  • More natural feeling since Stream/Sink is commonly known how to use and there a are tons of combinators
  • Sink::feed/Sink::flush allow burst control.
  • No need to manually poll the event loop.

The Sink impl is trivial since the used channel implementation implements Sink for Sender. The Stream part is also not that hard if the events (VecDeque) in MqttState is replaced with a channel. A rough draft is here.

If the separation is needed there's StreamExt::split

@de-sh
Copy link
Member

de-sh commented Mar 16, 2024

I don't get why there's a separation between AsyncClient and EventLoop

The initial design was simple, an eventloop, that gets driven by calls to EventLoop.poll(), where multiple threads could connect to and send data over an mpsc channel. we had not thought about designing it as a stream + sink setup, maybe we could plan this for the v1.0.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants