njh / ruby-mqtt

Pure Ruby gem that implements the MQTT protocol, a lightweight protocol for publish/subscribe messaging.
http://www.rubydoc.info/gems/mqtt
MIT License
542 stars 135 forks source link

Use a queue to wait for Puback packets rather than polling #120

Closed tenderlove closed 4 years ago

tenderlove commented 4 years ago

This commit changes the publish method to use a queue to wait for Puback packets rather than polling a hash. Every time the read loop gets data or a timeout from IO.select, it will send a message to everyone waiting for a Puback packet. If the we're within the deadline, then the loop executes again, if we got a packet, we'll return the packet, and if we're outside the deadline, a -1 is returned.

This upside is that this patch speeds up the publish method by over 100x. Here is the benchmark:

require "securerandom"
require "mqtt"
require "benchmark/ips"
require "stackprof"

client = MQTT::Client.new(username: 'testuser', password: 'testpasswd', client_id: "client_#{SecureRandom.hex(10)}", host: '127.0.0.1')
client.connect

Benchmark.ips do |x|
  x.report("send message") {
    i = rand(1..10)
    topic = "to/timebox#{i}/cameras"
    msg = "message #{Time.now} for timebox#{i}"
    client.publish(topic, msg, true, 1)
  }
end

Before this patch:

$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message     8.000  i/100ms
Calculating -------------------------------------
        send message     85.042  (± 4.7%) i/s -    432.000  in   5.089261s

After this patch:

$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message   915.000  i/100ms
Calculating -------------------------------------
        send message      9.453k (± 4.5%) i/s -     47.580k in   5.043716s

The downside is that the timeout isn't exact. Since IO.select times out every 0.5 seconds (according to the SELECT_TIMEOUT constant), the deadline in the publish method could be missed by that amount of time.

Refs #115

njh commented 4 years ago

Awesome, that is great, thanks!

I hated that there was a sleep in the code but had forgotten about it when those performance issues were brought up.

I will get those old/broken versions of ruby removed from Travis, then get this PR merged.

tenderlove commented 4 years ago

@njh 🙇🏻‍♀️ thank you for making this library!

tenderlove commented 4 years ago

While I was thinking about this, I realized I've kind of broken the timeout code. We need to add this line:

diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb
index 707f932..e6e9884 100644
--- a/lib/mqtt/client.rb
+++ b/lib/mqtt/client.rb
@@ -459,6 +459,7 @@ module MQTT
     def receive_packet
       # Poll socket - is there data waiting?
       result = IO.select([@socket], [], [], SELECT_TIMEOUT)
+      handle_timeouts
       unless result.nil?
         # Yes - read in the packet
         packet = MQTT::Packet.read(@socket)

I'm trying to figure out how to write a test for it though. It doesn't seem like there's a way to get the read thread to run in the test framework.

tenderlove commented 4 years ago

@njh I added a test for the timeout, but the test is extremely hacky :( It does ensure we hit our timeouts though.

njh commented 4 years ago

Sorry, I completely forgot to merge this. Done now! Thank you very much for this contribution.