AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

Handle messages asynchronously #71

Closed ivanovaleksey closed 6 years ago

ivanovaleksey commented 6 years ago

Hello, thank you for the great library!

I am looking for Rust MQTT library for my application. I want it to be able to handle incoming messages asynchronously because there will be DB interaction and other blocking things.

I added thread::sleep within on_message callback. Message handling had been queued: one message was handling (i.e. waiting on sleep) and new incoming message was queued and processed only after awaking. Maybe that is totally my fault and I made the main and the only one thread to sleep.

What is the correct way to asynchronous message handling? Can I create some kind of a thread poll within on_message and just push incoming messages to that pool? Or it will affect dealing with QoS? As I can see from the code we firstly send PUBACK and then call user's callback. Does it mean that it doesn't matter whether I would handle incoming message in the same or in separate thread because PUBACK was already sent?

Also, I have found tokio2 branch. Maybe it better suits for async handling?

Thank you.

tekjar commented 6 years ago

@ivanovaleksey tokio2 branch is still in works (maybe it'll be ready in the next 10 days) but it's very ergonomic with handling incoming messages. you get all the incoming messages in a channel and you can handle it in a way that fits your needs. But it's your responsibility to pull the messages out of the channel at a rate greater than incoming messages or else the packets will be dropped.

ivanovaleksey commented 6 years ago

Thank you for the reply @tekjar. How do I know this rate to avoid dropping messages? As I understand it depends on how much and how often my clients will send messages. But I can't predict this at any given moment. Is there any way to ensure that every message will be handled and nothing will be dropped?

Does dropping messages occur on master branch? As I understand tokio2 will be merged into the master, so dropping will be on master anyway?

tekjar commented 6 years ago

@ivanovaleksey master branch uses threadpool to execute callbacks. I've not tested master enough with incoming messages to understand how threadpool behaves during high loads. If threadpool uses a unbuffered channel, the memory will just keep going up if the callbacks are heavy/blocking. One way to ensure that you don't drop messages in tokio2 is to not do heavy processing in the thread that receives data and offloading work to a threadpool.

ivanovaleksey commented 6 years ago

Yes, there is a threadpool on master branch but it has capacity of 1, so it look like sequential message handling.

not do heavy processing in the thread that receives data and offloading work to a threadpool

Yes, that was my initial idea too. It doesn't have any impact on PUBACK since PUBACK is sent before calling user's callback, does it? The same is true for tokio2 too?

tekjar commented 6 years ago

Yeah puback should be sent before handling the message, or else (most) brokers will keep resending the message if your callback is heavy

ivanovaleksey commented 6 years ago

Thanks once again @tekjar. I am closing the issue and will try to use the library.

tekjar commented 6 years ago

The reason behind choosing a channel instead of callback is to give users the flexibility. These 'Receiver's are crossbeam-channels receivers which are very flexible. You can use these receivers in multiple threads to load balance or use a threadpool for other use cases.

Thinking about this, may be I should provide a way to configure the size of the channel (or) use unbounded channels

ivanovaleksey commented 6 years ago

Hello @tekjar, again I have a question about async handling. I forgot to mention (and now I am afraid it is a big problem) that I need not only to receive messages but also to respond on them.

Suppose I have thread pool with a size of 3. On receiving a new message I just push it to the pool to archive async handling. But then each handle, i.e. each one of 3 threads, needs to respond back. MqttClient::publish requires a &mut MqttClient, but it looks like I couldn't obtain a mutable reference in each thread. It seems like responding to a message would be a bottleneck.

Is there any workaround here? Or maybe I am missing something and there is no such problem?

Thank you.

tekjar commented 6 years ago

@ivanovaleksey If you are using tokio2 branch, you'll be able to clone MqttClient (not yet implemented). Can you open a separate issue for this?

ivanovaleksey commented 6 years ago

Thanks @tekjar, I currently use 0.10.1 version. I am very new to Tokio and I decided to start with "plain" version and then switch to Tokio-based. So, publishing from multiple threads without locking is not possible in 0.10.1 and would be implemented on tokio2 after a while, right?