bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.59k stars 247 forks source link

RFC(rumqttc): `publish` / `subscribe` / `unsubscribe` methods return a promise that resolves into pkid when packet is handled by `Eventloop` #805

Open de-sh opened 7 months ago

de-sh commented 7 months ago

This is the RFC for API changes in publish / subscribe / unsubscribe in rumqttc to resolve #349. Feel free to comment and provide feedback, thanks!

Example in the case of Publish

// async
let promise = async_client.publish(..).await?;
let pkid = promise.await;

// blocking
let promise = client.publish(..)?;
let pkid = promise.blocking_recv();

Within rumqttc this will be implemented similar to how oneshot channels work, i.e. here we will respond onto the oneshot channel with the pkid value. Also we need not go so far when we know that the publish is on QoS 0 or when pkid is non-zero by default maybe?

Please note that this is not a breaking change as users can just ignore/drop the promise token.

swanandx commented 7 months ago

so something like, publish will return Pkid where:

struct Pkid {
  id_rx: Receiver<u32>,
}

impl Future for Pkid {
  type Output = u32;
  fn poll(..) -> .. {
    // if we recv something on -d_rx, return id
    // we will send on that channel for state.
  }
}

right?

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

when pkid is non-zero by default maybe?

when is it non-zero by default?

as users can just ignore/drop the promise token.

that would mean the receiver is dropped as well. this would cause the Sender to fail!

de-sh commented 7 months ago
impl Future for Pkid {
 type Output = u32;
 fn poll(..) -> .. {
   // if we recv something on -d_rx, return id
   // we will send on that channel for state.
 }
}

We can just use oneshot as is, no need of reimplementing

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

Include it in the request?

when is it non-zero by default?

When the publish is edited before being sent, or in the case of uplink when it is read back from file, user can choose to do that, Eventloop will skip the pkid set stage in outgoing_*

that would mean the receiver is dropped as well. this would cause the Sender to fail!

Not in the case of oneshot, it might error out, but we need not care about this either ways from within Eventloop

swanandx commented 7 months ago

Include it in the request?

gotcha. so unlike https://github.com/bytebeamio/rumqtt/compare/main...pkid-poc , by using the channels we are avoiding direct use of Arc<Mutex<_>> !

Not in the case of oneshot, it might error out,

that is what i meant by fail :sweat_smile:

but we need not care about this either ways from within Eventloop

okie.

dlips commented 7 months ago

Hi, I am one of the possible users of this functionality from #349 :) Thanks for bringing this up again

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

For a single message, it makes sense to wait for the return of the pkid and then continue. In case that I have a high channel capacity and an limited inflight queue, then the task using the client would block/wait at high load peaks, where the event loop would need some time to process the message. Sure, I could spawn a task that listens to the one-shot channel and after receiving the pkid it passes it on to an mpsc that gathers all the pkids.

Also the processing of the promise is a bit time critical because if I do it to late, the one-shot channel would still have a pkid value, whose PubAck was already processed by the eventloop.

Sorry that I don't have any constructive feedback, just dropping some thoughts from the user perspective

de-sh commented 7 months ago

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

  1. We could do a lot of things here when considering async code, but not much when it comes to sync code.
  2. One could use the joinset.join_next() to stream pkids
dlips commented 7 months ago

joinset.join_next() is exactly what I was looking for. Missed it when looking through the tokio docs. That would solve my problem. Thank you :)

de-sh commented 7 months ago

Should this feature be feature gated to ensure we don't inadvertently make life worse for other users? I am not entirely sure how memory/cpu usage would increase, maybe a good idea to figure out with some profiling @swanandx

cartertinney commented 6 months ago

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

de-sh commented 6 months ago

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

We need a code review on the PR mentioned above to continue with this

de-sh commented 6 months ago

Adding more thoughts on this:

Maybe it is a good idea to have this same mechanism return the error responses relating to the publish packet so that the client side can keep track of the errors as well?

enum Error {
  Mqtt(mqttbytes::Error),
  PacketIdZero,
  EmptySubscription,
  OutgoingPacketTooLarge { .. },
}

type RequestResult = Result<Pkid, Error>;
cartertinney commented 6 months ago

@de-sh

Another question regarding the proposal - is there a guarantee that the pkid will be returned to the user before the associated PubAck can arrive? I've used other MQTT libraries before where by the time the id is returned, the puback may have already arrived, which makes matching the ids difficult, since you have to handle the case where you may receive a puback for an id that you are not yet tracking in your application.

Although, admittedly, even if we can guarantee that the pkid will be returned prior to the actual publish packet being sent, there is still a race condition for the asynchronous application (can the application track the id returned by the PkidPromise before the puback gets received)

One solution I've seen before that sidesteps this is to have publish return a future for the eventual ack, but that probably clashes with the polling receive pattern on the event loop in rumqttc. The intended design appears to be that since the user is controlling the event loop, they're responsible for distributing information received on it, which does makes sense, but unfortunately does leave puback matching to be a rather unpleasant experience.

de-sh commented 6 months ago

(can the application track the id returned by the PkidPromise before the puback gets received)

While there might be a theoretical possibility of the puback being received faster than the pkid being resolved on the client side, it is practically impossible(afaik). Thus it should be fine to consider this circumstance as an extreme case?

I need to be able to get pkid in order to wait for puback, in order to use this library.

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

cartertinney commented 6 months ago

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

The issue I'm trying to solve here is an application with distributed functionality - the main event loop and connection is handled/managed in a different place than the actual MQTT operations take place. For a simplified, but illustrative example, consider that publish logic is happening in a different thread/task. The publishing thread/task naturally wants to wait upon a PUBACK, but the PUBACK is actually received by the event loop in the main thread, thus there needs to be some kind concurrency solution to communicate this information.

This is further confounded if there are multiple publishers in multiple threads. The central event loop managing logic has no idea which publisher to deliver PUBACK notifications to, because the PKID of any given outgoing publish is only known to the publishing threads - the event loop can see it happen with an outgoing event, but it doesn't know which copy of the Client the publish came from. Thus, when the PUBACK comes in with a matching PKID, the main thread does not know which copy of the Client to send the PUBACK notification to. The only way around this that I've been able to think of, is that the publishing thread/task needs to communicate to the central thread which PKID it sent, so the central thread can send back the corresponding PUBACK notification accordingly. My concern is that this process may allow in some cases a PUBACK to be able to arrive before all the relevant information to handle it has been communicated and tracked.

In other libraries, I've seen this issue resolved with a publish returning a Future that can be awaited upon the eventual PUBACK, but I can appreciate that that might be somewhat against your design goals here.

dlips commented 6 months ago

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

My motivation to get the PkIds was also to track which messages I sent got a PubAck from the MQTT Broker. So having the Future resolve after the PubAck was received instead of returning the PkId would simplify my use case because I do not have to manually write the code that correlates the returned PkIds with the PubAck events from the EventLoop.

cartertinney commented 6 months ago

For what it's worth, the current implementation on the pkid branch seems to stop the event loop reporting PubAck right now. I don't receive PubAck at all on this branch, but I do receive it when using the main release. So yeah, I can get the pkid now, but I still can't match it with a PubAck.

After experimenting with the pkid branch in general, honestly, I'm starting to feel that .publish() really does need to return a PubAck future - trying to build the mechanism to get and deliver it on top just doesn't scale well to a complex application.

de-sh commented 5 months ago

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

https://github.com/bytebeamio/rumqtt/commit/b6a45b7713eebf8207abbff5a7588317fd27f7ab

xiaocq2001 commented 5 months ago

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

b6a45b7

I put some comments in the POC, please check. I like the implement of get notified on acknowledgements instead of doing matching additional pkids.

Should you need any assistance, don't hesitate to reach out. I'm eagerly anticipating the integration of this feature into the main branch.

xiaocq2001 commented 5 months ago

I did some changes based on your POC and did some validation locally, I share the changes in PR here: https://github.com/bytebeamio/rumqtt/pull/851, feel free to use them to in improvement feature or discuss.

xiaocq2001 commented 5 months ago

Given the successful POC and code reviews, I believe the ACK await feature is great. What are the next steps to ensure a smooth integration?

de-sh commented 4 months ago

We are working on making time for the review(running low on bandwidth), post that we should be good to go. Would have been wonderful if there were more inputs along those lines from the community!

xiaocq2001 commented 4 months ago

According to the recent changes in https://github.com/bytebeamio/rumqtt/pull/869, the notice struct needs update.

FixedBitSet helps to reduce memory usage comparing to vec, but is not suitable for the ack waiting, since ack notification needs packet linked internal struct instead of single bit. For ack waiting, the current implement of HashMap seems to be the better fit (to keep order of packets, LinkedHashMap).

Is there any suggestion on balance the needs?

xiaocq2001 commented 4 months ago

Another thing, while waiting ACKs, it's possible the connection is reconnected with broker CONNACK session present=0, in such case, the pending waitings should be terminated with some error (state error that session closed by broker).

joshuachp commented 3 months ago

What do you think about making the NoticeTx public? This will make possible the construction of the NoticeFuture for test and mocking via mockall. This includes also the success and error methods.

FSMaxB commented 3 months ago

I've rebased the acked branch onto main in #883.

Currently this conflicts with the FixedBitset changes as mentioned by @xiaocq2001 which means I had to revert some of it.

My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this:

Instead of always returning a NoticeFuture, instead allow anyone to create a NoticeTx and NoticeFuture pair and change the API so that instead of returning NoticeFuture they take a NoticeTx as parameter. Then also in the Mqtt state, keed the FixedBitSet plus additionaly a HashMap<u16, NoticeTx> but only populate that second one if a notification for the given message was actually requested (by passing a NoticeTx). This means that you don't have to pay any penalty when not using that feature (except for the occasional lookup in an empty HashMap, which should be fast).

joshuachp commented 2 months ago

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

For example, you are storing the NoticeFutures in a Vec, and you want to find the first acknowledge packet.

This would be similar to the try_recv(&mut) of the inner channel.

One implementation could be:

diff --git a/rumqttc/src/notice.rs b/rumqttc/src/notice.rs
index 4818335..2142e81 100644
--- a/rumqttc/src/notice.rs
+++ b/rumqttc/src/notice.rs
@@ -53,6 +53,21 @@ pub fn wait(self) -> NoticeResult {
     pub async fn wait_async(self) -> NoticeResult {
         self.0.await?
     }
+
+    /// Attempts to check if the broker acknowledged the packet, without blocking the current thread
+    /// or consuming the notice.
+    ///
+    /// It will return [`None`] if the packet wasn't acknowledged.
+    ///
+    /// Multiple calls to this functions can fail with [`NoticeError::Recv`] if the notice was
+    /// already waited and the packet was already acknowledged and [`Some`] value was returned.
+    pub fn try_wait(&mut self) -> Option<NoticeResult> {
+        match self.0.try_recv() {
+            Ok(res) => Some(res),
+            Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
+            Err(oneshot::error::TryRecvError::Empty) => None,
+        }
+    }
 }

 #[derive(Debug)]

Here, it returns an Option instead of an error for the oneshot::error::TryRecvError::Empty, just to not add another variant to the error.

de-sh commented 2 months ago

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

That's a great suggestion, thanks for the input!

BjoernLange commented 2 weeks ago

Having this feature would be really helpful for my use-case. How is the progress? Is there already a version that I can try out? Thank you!

KillingJacky commented 1 week ago

I've rebased the acked branch onto main in #883.

Currently this conflicts with the FixedBitset changes as mentioned by @xiaocq2001 which means I had to revert some of it.

My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this:

Instead of always returning a NoticeFuture, instead allow anyone to create a NoticeTx and NoticeFuture pair and change the API so that instead of returning NoticeFuture they take a NoticeTx as parameter. Then also in the Mqtt state, keed the FixedBitSet plus additionaly a HashMap<u16, NoticeTx> but only populate that second one if a notification for the given message was actually requested (by passing a NoticeTx). This means that you don't have to pay any penalty when not using that feature (except for the occasional lookup in an empty HashMap, which should be fast).

I'm one of the people that don't care the slight performance loss :) And I don't think using HashMap would ruin badly on the performance or resource consumption, because generally we wouldn't configure the Q depth of the eventloop too big (currently I'm using 100 for cap in my project), before the feature of saving current state into disk / cache system was implemented. Instead, I would manage the outgoing message Q with a more reliable way out of the mqtt client scope if I have the packet acked mechanism, so that outgoing messages won't get lost if there's an unexpected shutdown on rumqtt client.

Just put a vote on moving this forward, firstly make this feature usable, then we would have a long time window to tune the performance in parallel with features like this https://github.com/bytebeamio/rumqtt/blob/main/rumqttc/design.md#disk-aware-request-stream being implemented.

FSMaxB commented 1 week ago

Note that I won't have any time to continue working on this since at work it was reverted as a failed experiment since there were other issues with the unreleased version of rumqttc which I didn't have enough time to look into.

I'm on your side regarding the performance loss, but the FixedBitSet changes were merged in the first place, so there obviously are people that do care about that, which is why I was considering a solution that allows keeping it. But just going back to hash maps would make implementing this way easier for sure.