bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.64k stars 252 forks source link

rumqttc: `ConnectionError::RequestsDone` is unreachable #815

Open flxo opened 8 months ago

flxo commented 8 months ago

The async v5 client implementation checks for errors when receiving from the request channel. The only error that can happen here (with flume) is RecvError (disconnected): An error that may be emitted when attempting to wait for a value on a receiver when all senders are dropped and there are no more messages in the channel..

EventLoop holds a sender handle to this channel. This error condition can never happen. The implementation is confusing.

During the construction of the AsyncClient the tx handle is cloned.

A easy fix could be to return the sending half from EventLoop::new (and update the visibility) and move it into the AsyncClient:

diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs
index 910da50..e2288a3 100644
--- a/rumqttc/src/v5/client.rs
+++ b/rumqttc/src/v5/client.rs
@@ -54,9 +54,7 @@ impl AsyncClient {
     ///
     /// `cap` specifies the capacity of the bounded async channel.
     pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
-        let eventloop = EventLoop::new(options, cap);
-        let request_tx = eventloop.requests_tx.clone();
-
+        let (request_tx, eventloop) = EventLoop::new(options, cap);
         let client = AsyncClient { request_tx };

         (client, eventloop)
diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs
index 36c1097..6210b0c 100644
--- a/rumqttc/src/v5/eventloop.rs
+++ b/rumqttc/src/v5/eventloop.rs
@@ -75,8 +75,6 @@ pub struct EventLoop {
     pub state: MqttState,
     /// Request stream
     requests_rx: Receiver<Request>,
-    /// Requests handle to send requests
-    pub(crate) requests_tx: Sender<Request>,
     /// Pending packets from last session
     pub pending: VecDeque<Request>,
     /// Network connection to the broker
@@ -97,21 +95,23 @@ impl EventLoop {
     ///
     /// When connection encounters critical errors (like auth failure), user has a choice to
     /// access and update `options`, `state` and `requests`.
-    pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
+    pub(crate) fn new(options: MqttOptions, cap: usize) -> (Sender<Request>, EventLoop) {
         let (requests_tx, requests_rx) = bounded(cap);
         let pending = VecDeque::new();
         let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
         let manual_acks = options.manual_acks;

-        EventLoop {
-            options,
-            state: MqttState::new(inflight_limit, manual_acks),
+        (
             requests_tx,
-            requests_rx,
-            pending,
-            network: None,
-            keepalive_timeout: None,
-        }
+            EventLoop {
+                options,
+                state: MqttState::new(inflight_limit, manual_acks),
+                requests_rx,
+                pending,
+                network: None,
+                keepalive_timeout: None,
+            },
+        )
     }

     /// Last session might contain packets which aren't acked. MQTT says these packets should be
de-sh commented 8 months ago

Going by how other libraries handle this, I think we should deprecate Async/Client::new(opt, cap) and just have an EventLoop::new(opt, cap) -> (AsyncClient, EventLoop) and EventLoop::new_sync(opt, cap) -> (Client, EventLoop)

flxo commented 8 months ago

From an even higher perspective I don't get why there's a separation between AsyncClient and EventLoop. Maybe a relict from the sync versions and the intention is to keep them close to each other.

Why not:

AsyncClient::new(opt) -> AsyncClient;
impl Stream<Item = Result<Event, ConnectionError> for AsyncClient { ... }
impl Sink<Request, Error = ConnectionError> for AsyncClient { ... }

I see the following advantages:

The Sink impl is trivial since the used channel implementation implements Sink for Sender. The Stream part is also not that hard if the events (VecDeque) in MqttState is replaced with a channel. A rough draft is here.

If the separation is needed there's StreamExt::split

de-sh commented 8 months ago

I don't get why there's a separation between AsyncClient and EventLoop

The initial design was simple, an eventloop, that gets driven by calls to EventLoop.poll(), where multiple threads could connect to and send data over an mpsc channel. we had not thought about designing it as a stream + sink setup, maybe we could plan this for the v1.0.0