njh / ruby-mqtt

Pure Ruby gem that implements the MQTT protocol, a lightweight protocol for publish/subscribe messaging.
http://www.rubydoc.info/gems/mqtt
MIT License
542 stars 135 forks source link

Thread termination with exception not handled? #142

Open phlegx opened 2 years ago

phlegx commented 2 years ago

With this simple implementation, I get an exception after broker goes down for some reason.

client.subscribe('test' => 0)
client.get do |topic, message|
  # Block is executed for every message received
end

Here the log:

/usr/local/bundle/bundler/gems/ruby-mqtt/lib/mqtt/packet.rb:222:in `getbyte': Connection reset by peer @ io_fillbuf - fd:5 (Errno::ECONNRESET)
from /usr/local/bundle/bundler/gems/ruby-mqtt/lib/mqtt/packet.rb:222:in `read_byte'
from /usr/local/bundle/bundler/gems/ruby-mqtt/lib/mqtt/packet.rb:30:in `read'
from /usr/local/bundle/bundler/gems/ruby-mqtt/lib/mqtt/client.rb:466:in `receive_packet'
from /usr/local/bundle/bundler/gems/ruby-mqtt/lib/mqtt/client.rb:273:in `block in connect'

How can I rescue from this exception?

JakubSzajna commented 1 year ago

Hello! I just run into that issue too. It's due to a fact that Exception here is in fact MQTT::Exception instead of ::Exception and I wonder if this is intentionally implemented that way? Because of that, if the connection is broken for some reason the thread dies silently and the only trace of that is in the logs.

I don't want to globally set abort_on_exception, but if using MQTT::Exception is intentional, maybe allowing one to set this value for the @read_thread specifically via some config option + having the issue described somewhere would be enough?

My fix for now is something like this:

MQTT::Client.prepend(Module.new do
  def receive_packet
    super
  rescue ::Exception => ex
    Thread.current[:parent].raise(ex)
  end
end)
phlegx commented 1 year ago

@JakubSzajna but ruby-mqtt has this definition: mqtt.rb#L23

module MQTT
  ...
  # Super-class for other MQTT related exceptions
  class Exception < ::Exception
  end
  ...
end

Used here: mqtt/client.rb#L476

# Pass exceptions up to parent thread
rescue Exception => exp
  unless @socket.nil?
    @socket.close
    @socket = nil
    handle_close
  end
  Thread.current[:parent].raise(exp)
end

Is @socket.close and handle_close considered in your fix? How can I catch the exception in my case, described in the first comment of this issue?

JakubSzajna commented 1 year ago

@phlegx Yes, it does have this definition and it branches out from stdlib Exception.

rescue Exception => exp in MQTT module refers to MQTT::Exception instead of ::Exception though. So it would catch exceptions like MQTT::Exception and everything that inherits from MQTT::Exception. Errno::ECONNRESET does not have MQTT::Exception in it's ancestors chain, so it won't be catched in the rescue clause you mentioned.

The monkey-patch from my message wraps the whole receive_packet in the rescue clause, so every exception that will be raised inside receive_packet will be catched there and elevated to the parent thread. Hope this is the answer for your question :) It is supposed to make sure the exception from your initial message (and others from inside the receive_packet method) can be rescued. So this does not actually rescue from the exception, it just makes it rescue'able :) To catch this exception, use a rescue clause in a scope where you use MQTT::Client.connect method.

phlegx commented 1 year ago

@JakubSzajna can you write a PR please?

phlegx commented 1 year ago

Found similar PR #112 related to StandardError.

JakubSzajna commented 1 year ago

I don't think this fixes the issue were talking about here. If I find some time, I'll create a PR to include a proposed solution in the gems code.

Toucouleur66 commented 11 months ago

I don't think this fixes the issue were talking about here. If I find some time, I'll create a PR to include a proposed solution in the gems code.

any hope for this PR?