AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

callbacks for state change #82

Closed bestouff closed 5 years ago

bestouff commented 6 years ago

Hi,

Would it be possible to have callbacks for state changes (connection lost, connection OK) ?

bestouff commented 6 years ago

Here's the problem I have to solve: I have a "last will" which sets a topic to a particular value on disconnection. That topic is reset at client start. However with automatic reconnection, the topic is set to the "last will" value and the client never has a chance to reset it back because it never knows a deconnection/reconnection happened.

tekjar commented 6 years ago

@bestouff In tokio2 branch, I'm using channel to pass incoming messages and acks to the user. But I think it won't solve your problem as connection options can't change. I can add callbacks on disconnect/reconnect but the design won't be uniform anymore (callbacks for disconnects and channels for other events). I'll see if I can find some solution for this

bestouff commented 6 years ago

I'm not looking for callbacks per se, a regular message though channels would be good also. Just a mean for my "business logic" to know it's been reconnected and it has to cancel its last will.

tekjar commented 6 years ago

@bestouff Ok. I'll add disconnect events to the queue. But tokio is moving alot. I think 0.2 is planned soon

bestouff commented 6 years ago

Just for clearing my doubts: which is the branch to use ?

tekjar commented 6 years ago

All new development is happening on tokio2 branch

bestouff commented 6 years ago

Ah thanks. Will you soon merge and publish on crates.io ?

tekjar commented 6 years ago

Soon might be difficult as there is futures 0.3 coming soon. I wonder when things would settle down w.r.t tokio and futures

vincentdephily commented 6 years ago

So... Should we be tracking tokio2 or v3 branch at this stage ?

I also would like to know about network status. My usecase is that when network is down, I'd rather wait and optimize/trim my payload internally rather than give it immediately to rumqtt and waste resources if the network is down for a while.

In the v3 branch, looks like a nice place to expose this would be in the Notification enum, but having a MqttClient::connected() -> bool method would be useful as well.

tekjar commented 6 years ago

So... Should we be tracking tokio2 or v3 branch at this stage ?

Please track v3 now. Tokio 2 was an experiment which didn't work out.

I also would like to know about network status. My usecase is that when network is down, I'd rather wait and optimize/trim my payload internally rather than give it immediately to rumqtt and waste resources if the network is down for a while.

Can you explain a bit more about this? Do you want to check if the network is down during every publish? Right now the strategy to handle lost/slow network is through back pressure in request channel. I'll provide options to adjust the channel size.

tekjar commented 6 years ago

But disconnection event will be known through Notification

vincentdephily commented 6 years ago

I also would like to know about network status. My usecase is that when network is down, I'd rather wait and optimize/trim my payload internally rather than give it immediately to rumqtt and waste resources if the network is down for a while.

Can you explain a bit more about this? Do you want to check if the network is down during every publish?

No, I want to hold off calling publish() until the connection is up, because what I would send now is different from what I would send in 10 min: Some data will have become obsolete, some data will have become urgent (rescheduled to go first), and some data will be linearized differently (because there's more of it), etc.

These payload changes are beyond the scope of MQTT, but I need to know the connection status to apply my changes efficiently.

Right now the strategy to handle lost/slow network is through back pressure in request channel. I'll provide options to adjust the channel size.

That's useful too; both are needed.

bestouff commented 6 years ago

Just issuing QoS0 isn't sufficient ?

vincentdephily commented 6 years ago

QoS is orthogonal to my need. Each MQTT packet contains multiple datapoints. The QoS of one MQTT packet is the max QoS of the datapoints in it (often QoS2). My datapoints also have individual deadlines (beyond which they are no longer useful), which MQTT's QoS level cannot describe. If the network is down, it's an opportunity to do more optimisations and trims on my data before I send it.

tekjar commented 5 years ago

No, I want to hold off calling publish() until the connection is up, because what I would send now is different from what I would send in 10 min: Some data will have become obsolete, some data will have become urgent (rescheduled to go first), and some data will be linearized differently (because there's more of it), etc

@bestouff @vincentdephily Is this still relevant to you? Need some to test the new notifications along with state changes.

tekjar commented 5 years ago

develop branch now sends connection state change events after the first connection

vincentdephily commented 5 years ago

Sorry, I moved on to using mqtt-rs which is a bare codec crate, and handle the state and io myself. I think full crates like rumqtt make a lot of sense, but I needed something unopinionated for the time being. Looking back, I'm not quite sure why I went to mqtt-rs instead of mqtt311, but that's the direction I took for now.

bestouff commented 5 years ago

FWIW: been there done that, and now I'm back to rumqtt but with auto-reconnect off and I'm handling reconnections manually.

tekjar commented 5 years ago

@vincentdephily @bestouff If the use case is simple (like sending a publish and synchronously wait for the ack) it's simpler to do it with codec rather than depending on unknown source code like rumqtt.

Unfortunately this isn't the case for us. I've tried using simple mqtt packet parsers but there are too many cases to handle to create a robust and fast (asynchronous) mqtt layer on top of it. Idea here is to consolidate all those so that every one who needs a client doesn't have to reinvent the wheel (worse yet, not understand what might go wrong).

Maybe I'll be able to help better if know exactly what you are trying to do but the problem is time. This is combination of me being very occupied with other work along with the complexity of writing code with tokio + futures in its current state. Coding speed is horrible in my opinion. I'm hoping async await would solve this.

I've a few more days of time to polish this crate. Please let me know if I can help :)

tekjar commented 5 years ago

I'm going to assume this works now. Feel free to reopen if it doesn't