njh / ruby-mqtt

Pure Ruby gem that implements the MQTT protocol, a lightweight protocol for publish/subscribe messaging.
http://www.rubydoc.info/gems/mqtt
MIT License
541 stars 135 forks source link

Add support for asynchronous QOS 1 publish (later also 2) #106

Open Xasin opened 6 years ago

Xasin commented 6 years ago

With the current implementation, the client blocks when sending a qos 1 packet, waiting for the response. This is fine for most applications, however for one of my projects it would be beneficial to be able to send out larger "batches" of qos 1 messages, communicating with various separate electronics. If each single publish blocks and waits for a server response, this can pile up to a fairly large amount of wait time.

It would be nice to have the option of offloading this puback wait to a separate thread, which would run in the background and would handle the puback processing (presumably by polling). This thread could also re-send packets if they time out, instead of throwing a Timeout::Error if no puback is received in time.

(Note: Maybe the puback packet processing could be done directly in the handle_packet function, while a separate thread could poll and detect puback packet timeouts)

That way, a lot of qos > 0 messages could be sent at once, none of which would block the main thread.

pgouv commented 6 years ago

I think that packets should be sent in order so if you have QOS=1 you have to wait for ACK before sending the next one. Also there is no queue for outgoing packets so implementing this would require many changes. So actually QOS 1 isn't really supported.

Xasin commented 6 years ago

Well, for my problem you don't really need packets to arrive in order, but I see how this could be an important feature for other applications.

But if packet order is not important, I am not sure why you would need a queue for outgoing packets. Sending a packet only takes a fraction of the process time that the total of a QOS 1 message requires, so it could still be done synchronously. Instead of the main thread waiting on the puback though, you could just have a hash storing the not-yet-confirmed packets (with their packet ID as keys maybe). The handle_packet function would automatically clear the hash entries of matching puback packets. In parallel, you could have a single thread polling maybe once a second. It could run through the unconfirmed packet hash, look for packets that have timed out, and re-send those.

Although you are correct, @parhs. With the requirement of packet order, things do get a lot more complicated. That is something I would leave for QOS 2 applications though, as QOS 1 only specifies one or more packet transmissions, which does not really guarantee packet order.

Maybe I should make a PR for this feature myself, but I am not sure how to set up documentation, commenting, etc...

We'll see.

njh commented 4 years ago

I have found writing asynchronous code in ruby challenging and it is hard to know when to block and how much to use threads.

I wrote ruby-em-mqtt in order to deal with asynchronous pubsub with ruby. Although it could probably do with some more work. EventMachine works in a similar way to node.js and provides mechanisms for handling asynchronous code.

I think the alternative would be to use an event loop based MQTT client. Maybe the paho mqtt client would suit you better? https://github.com/eclipse/paho.mqtt.ruby

Xasin commented 4 years ago

Hm... For now I'll keep using your code. It does work plenty fine and I've had only minor problems, and as long as the internet connection isn't laggy, QOS 1/2 don't give that much benefit.

I personally would just use a ruby Queue to store data to be sent and have a blocking thread that handles sending, one for receiving... But since I never wrote a communication code like this I might be underestimating the challenges here.

Thank you a lot for pointing out the Paho MQTT client, it looks like a good alternative that I must have missed somehow!

I suppose that settles this issue for me, but maybe you'd like to keep it open in case you do want to tackle it someday?