toyokazu / fluent-plugin-mqtt-io

Fluent plugin for MQTT Input/Output
Apache License 2.0
17 stars 12 forks source link

mqtt_proxy: Execute retry_connect in main thread to prevent updating `@_retrying` race condition #22

Closed cosmo0920 closed 3 years ago

cosmo0920 commented 3 years ago

timer_execute will execute registered method in main thread. Prevous implementation will causes unhandled retrying with the following error sequence:

  1. send_packet(MQTT::Packet::Subscribe) in mqtt_proxy.rb (running on @connection_thread)
  2. When https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L554, and then keep_alive! method failed in client.rb (running on @read_thread)
  3. IOError occurred in https://github.com/njh/ruby-mqtt/blob/v0.5.0/lib/mqtt/client.rb#L556 in mqtt_proxy.rb (running on @connection_thread)
  4. Executed #retry_connect in mqtt_proxy.rb (running on @connection_thread)
  5. Before reached in https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L93, @retry_interval second(s) passes in mqtt_proxy.rb (running on @connection_thread)
  6. During 5. execution, https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb#L152 is executed in main thread
  7. Then, retry is disabled.

In short, @connection_thread in mqtt_proxy.rb and main thread will causes updating @_retrying race condition.

To prevent this race condition, we should use timer_execute for #retry_connect to execute retrying connection operation on main thread not executing it on @connection_thread. This race condition will not happen when every running retrying operation runs on main thread.

Signed-off-by: Hiroshi Hatake hatake@clear-code.com

cosmo0920 commented 3 years ago

@toyokazu Any thoughts?

toyokazu commented 3 years ago

Thank you for your indication and sorry for being late to reply. As you indicated, the current implementation unnaturally complies with the fluentd thread management. When fluentd was updated to v1.0, I tried to use plugin helper APIs including thread_create and timer_execute but it failed. Current implementation keeps "somehow working" status ;(. It should be fixed by redesigning the procedure.

The point is that the plugin must manage the thread @read_thread created by ruby-mqtt. In my understanding, by using timer_execute, a new thread is created at the specified interval from the main thread and it will not fit the purpose of the plugin. In order to keep monitoring the status of the thread, it is better to create a proxy thread at the start phase and keep it until the end of the life of the plugin.

As you indicate, a thread created by the main thread itself cannot rescue MQTT::ProtocolException but a child thread of that thread can rescue the Exception. Based on that, I modified the plugin and committed to develop/0.5.x branch. How do you think? If you like, I’d like to merge it to the master branch.

I could not reproduce a race condition as you indicated, so I don’t know the modification can satisfy your requirements or not. If you have some description to reproduce the condition, please provide me.

Furthermore, the previous implementation has an issue if it uses proxy services like SSH port forwarding because a proxy service can accept a new connection and after that, it returns an error packet. It causes short-term reconnection when the MQTT server is down and it also prevents reconnection after the server starts up. I could not find a way to detect such a situation only from Exception and I newly added :max_retry_freq option for detecting it. Please also give your comment on that function.

Best Regards

cosmo0920 commented 3 years ago

In my understanding, by using timer_execute, a new thread is created at the specified interval from the main thread

timer_execute doesn't create a new thread, just defers a task on the main thread.

timer_execute: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/timer.rb#L33-L42 event_loop_attach: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/event_loop.rb#L38-L44

There is no creating a new thread. Just register deferred context on the main thread.

Fluentd's plugin helper creates a thread when users use thread plugin helper. https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin_helper/thread.rb

I could not reproduce a race condition as you indicated, so I don’t know the modification can satisfy your requirements or not. If you have some description to reproduce the condition, please provide me.

I used the following flood of messages creation script:

require 'mqtt'

opts = {
  host: '127.0.0.1',
  port: 1883,
  client_id: "test-client",
  clean_session: true,
  keep_alive: 15
}
client = MQTT::Client.new(opts)
client.connect
while true
  start = Time.now
  35000.times do
    client.publish('test', "a" * 4000)
  end
  now = Time.now
  diff = start - now
  sleep(1 - diff)
end
cosmo0920 commented 3 years ago

It causes short-term reconnection when the MQTT server is down and it also prevents reconnection after the server starts up. I could not find a way to detect such a situation only from Exception and I newly added :max_retry_freq option for detecting it. Please also give your comment on that function.

I tried develop/0.5.x branch with modified ruby-mqtt which is applied the following patches:

And I also applied a patch to handle publish response as keep alive response in this plugin(in high load environment keep alive response sometimes causes MQTT::ProtocolException due to timeout):

diff --git a/lib/fluent/plugin/mqtt_proxy.rb b/lib/fluent/plugin/mqtt_proxy.rb
index de6817d..7d162e3 100644
--- a/lib/fluent/plugin/mqtt_proxy.rb
+++ b/lib/fluent/plugin/mqtt_proxy.rb
@@ -74,6 +74,7 @@ module Fluent::Plugin
         opts[:cert_file] = @security.tls.cert_file
         opts[:key_file] = @security.tls.key_file
       end
+      opts[:assume_publish_response_as_pingresp] = true

       init_retry_interval
       @retry_sequence = []

Then, develop/0.5.x branch works nicely. Thanks! :smile: And also non-patched version can handle reconnection w/o unexpected ending of retrying.

toyokazu commented 3 years ago

Thank you for your advice! Sorry, I couldn’t have time to trace the details of the fluentd main thread implementation, and your information is beneficial. I understand the procedure of timer_execute().

In the fluent-plugin-mqtt-io case, it might be better to create a thread for keeping the ruby-mqtt read thread and it is also preferred to be maintained by the main thread. So thus, instead of using timer_execute, I’d like to use thread_create() here.

I also misunderstood your pull request. I could not have time to trace the details in the previous implementation but with fluentd 1.11.5, Exceptions are correctly rescued by the thread generated by thread_execute(). So, I removed unnecessary thread “in_mqtt/out_mqtt_monitor”, and left “in_mqtt/out_mqtt_proxy”, “in_mqtt_get” and “out_mqtt_dummy” threads.

Current implementation still has several issues, e.g. the call graph becomes large (connect is called many times…) in the heavy reconnection environment. The memory should be cleared by using the “max_retry_interval” parameter and periodical rebooting.

Thank you again for providing a benchmark script. The revised version seems to keep connection under the requests from your benchmark script by using ruby-mqtt with patches (128, 130). So, I’d like to merge it in a few days. if you find issues, please share them here.

When patch 128 is accepted by ruby-mqtt, please also send a pull request to add a parameter of fluent-plugin-mqtt-io.

By the way, applying patch 128 of ruby-mqtt seems to cause the following error. ruby-mqtt developers might request to investigate it before applying the patch.

#<Thread:0x00007fe1c02ce2b0 /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:276 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
  6: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:278:in `block (2 levels) in connect'
  5: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:468:in `receive_packet'
  4: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:468:in `synchronize'
  3: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:472:in `block in receive_packet'
  2: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:498:in `handle_packet'
  1: from /path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:498:in `synchronize'
/path_to_rbenv/rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:499:in `block in handle_packet': undefined method `<<' for nil:NilClass (NoMethodError)

Just FYI.

After merging to the master branch, I’d like to close this pull request. Thanks.

Best Regards

toyokazu commented 3 years ago

I just merged the updates to the master branch and push a new gem to rubygems.org. Thank you for your contributions!