toyokazu / fluent-plugin-mqtt-io

Fluent plugin for MQTT Input/Output
Apache License 2.0
17 stars 12 forks source link

support retain and qos options #8

Closed toshitanian closed 6 years ago

toshitanian commented 6 years ago

Added qos and retain options, especially qos is important parameter for users. Default value of qos is set to 1, at least once message delivery.

toyokazu commented 6 years ago

Thank you for your comment! I agree with you to add qos and retain options. However, as you indicated in the previous pull request (already closed one), leaving the reconnection function to fluentd may be an approach for solving MQTT disconnection. I wanted to prevent fluentd restart caused by MQTT disconnection but the current code cannot properly kill whole zombie threads after reconnection. I also want to control reconnection frequency (because of the uplink limitation) but there may be a way to control fluentd restart timing (monitoring logs from outside...). So, I made a new version without reconnection codes.

https://github.com/toyokazu/fluent-plugin-mqtt-io/tree/remove_reconnection_and_add_qos_retain

You can try it as follows:

git checkout remove_reconnection_and_add_qos_retain
rake build
gem install pkg/fluent-plugin-mqtt-io-0.4.1.gem

How do you think that? If you agree with it, I'd like to release 0.4.1.

Best Regards

toyokazu commented 6 years ago

By the way, I also released simply merged version as 0.3.10. 0.4.x also changes dependency to fluentd from ~> 0.14 to >= 0.14.

toshitanian commented 6 years ago

Cool! I will check the branch :)

toshitanian commented 6 years ago

I tried the new version. But I got one problem. My use case of this plugin is sending messages from an environment that the connectivity can be offline (and sometime online). I tested the buffering by switching my wifi toggling ON and OFF to check if messages received while offline is resend after connection gets online again.

The thing happened when I tuned wifi off is that: 1) The flunetd process is crashed and restarted for not receiving correct byte somewtere. 2) New fluentd process tries to launch. 3) It crashes again because it cannot resolve brokers name (like, xxx.iot.us-west-2.amazonaws.com) 4) go back to 2)

The major problem is that fluent-forward-in server (receiving with 24224 port) also crashes along with the mqtt-out plugin. It is not the behavior that I want. I want it keep receiving messages from upstream message producers and save them to mqtt-out plugin's buffer that resends to mqtt broker when the connection gets up.

@toyokazu Do you have any idea to prevent the fluent-forward-in plugin stopping?

2018-01-06 07:42:08 +0000 [debug]: #0 MqttOutput::block in write: device/8888, 1515224527, {"log"=>"'{\"message\": \"this device line 1515224490-26\"}'", "container_id"=>"c1d3471e1c4884744a2fa3e4dcbb98e41
baf751618552d40ef6d35ae9d0cb2a9", "container_name"=>"/pp11", "source"=>"stdout"}
2018-01-06 07:42:08 +0000 [error]: #0 unexpected error error_class=MQTT::ProtocolException error="Failed to read byte from socket"
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/packet.rb:283:in `read_byte'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/packet.rb:31:in `read'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:481:in `receive_packet'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:302:in `block in connect'
2018-01-06 07:42:08 +0000 [warn]: #0 thread doesn't exit correctly (killed or other reason) plugin=Fluent::Plugin::MqttOutput title=:event_loop thread=#<Thread:0x007f1ee72687e0@/var/lib/gems/2.3.0/gems/fl
uentd-1.0.2/lib/fluent/plugin_helper/thread.rb:70 aborting> error=nil
2018-01-06 07:42:08 +0000 [warn]: #0 thread doesn't exit correctly (killed or other reason) plugin=Fluent::Plugin::MqttOutput title=:flush_thread_0 thread=#<Thread:0x007f1ee7268c18@/var/lib/gems/2.3.0/gem
s/fluentd-1.0.2/lib/fluent/plugin_helper/thread.rb:70 aborting> error=nil
2018-01-06 07:42:08 +0000 [warn]: #0 thread doesn't exit correctly (killed or other reason) plugin=Fluent::Plugin::MqttOutput title=:enqueue_thread thread=#<Thread:0x007f1ee7268920@/var/lib/gems/2.3.0/gem
s/fluentd-1.0.2/lib/fluent/plugin_helper/thread.rb:70 aborting> error=nil
2018-01-06 07:42:08 +0000 [warn]: #0 [input1] thread doesn't exit correctly (killed or other reason) plugin=Fluent::Plugin::ForwardInput title=:event_loop thread=#<Thread:0x007f1ee8072a20@/var/lib/gems/2.
3.0/gems/fluentd-1.0.2/lib/fluent/plugin_helper/thread.rb:70 aborting> error=nil
/var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/packet.rb:283:in `read_byte': Failed to read byte from socket (MQTT::ProtocolException)
        from /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/packet.rb:31:in `read'
        from /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:481:in `receive_packet'
        from /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:302:in `block in connect'
2018-01-06 07:42:08 +0000 [info]: Worker 0 finished unexpectedly with status 1
2018-01-06 07:42:08 +0000 [info]: gem 'fluent-plugin-mqtt-io' version '0.4.1'
2018-01-06 07:42:08 +0000 [info]: gem 'fluentd' version '1.0.2'
2018-01-06 07:42:08 +0000 [info]: adding match in @mainstream pattern="**" type="mqtt"
2018-01-06 07:42:08 +0000 [info]: adding source type="forward"
  @type mqtt
  @log_level "trace"
  host "xxx.iot.us-west-2.amazonaws.com"
  port 8883
  rescue_disconnection true
  <format>
    @type "json"
    add_newline false
  </format>
  <security>
    use_tls true
    <tls>
      ca_file "/cert/cacert.pem"
      key_file "/cert/private.key"
      cert_file "/cert/cert.pem"
    </tls>
  </security>
  <buffer>
    @type "file"
    path "/tmp/aaa.buffer"
    chunk_limit_size 256m
    total_limit_size 1024m
    flush_mode interval
    flush_interval 1s
    overflow_action drop_oldest_chunk
    retry_wait 120s
  </buffer>
</match> is not used.
2018-01-06 07:42:08 +0000 [info]: #0 starting fluentd worker pid=56 ppid=6 worker=0
2018-01-06 07:42:08 +0000 [debug]: #0 buffer started instance=69928438309440 stage_size=0 queue_size=186
2018-01-06 07:42:08 +0000 [debug]: #0 start mqtt proxy for out_mqtt
2018-01-06 07:42:08 +0000 [debug]: #0 start to connect mqtt broker xxxx.iot.us-west-2.amazonaws.com:8883
2018-01-06 07:42:08 +0000 [error]: #0 unexpected error error_class=SocketError error="getaddrinfo: Name or service not known"
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:257:in `initialize'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:257:in `new'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:257:in `connect'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/fluent-plugin-mqtt-io-0.4.1/lib/fluent/plugin/mqtt_proxy.rb:63:in `start_proxy'
  2018-01-06 07:42:08 +0000 [error]: #0 /var/lib/gems/2.3.0/gems/fluent-plugin-mqtt-io-0.4.1/lib/fluent/plugin/out_mqtt.rb:76:in `start'
[#20(whoami)@kawasaki-macbook]1:..ogger-fluentd  2:..ogger-fluentd* 3:awslogs-
toyokazu commented 6 years ago

Thank you for your comment! I agree with you. Since I also have similar requirements, I've implemented reconnection function.

I tried to refactor my codes. From fluentd 0.14, basically, all threads must be managed by fluentd. However, for using ruby-mqtt, @client.get loop requires monitoring thread to catch Exceptions. It is difficult to manage the thread without own thread management (thread.kill). So, I changed all threads used in this plugin to be managed inside the plugin. I tried to keep that whole threads are properly killed in shutdown phase inside the plugin. I guess that it may not affect the other plugins (not sure now...). Furthermore, in order to realize resending after reconnection, try_write must be implemented. So, I added try_write to out_mqtt.rb. Though, it cannot guarantee whole the messages sent only once...

Please try the following new version.

https://github.com/toyokazu/fluent-plugin-mqtt-io/tree/fix_reconnection_and_add_qos_retain_async_buffer

It would be grateful if I can have your feedback :)

Best

toshitanian commented 6 years ago

Thank you so much! I will try my use cases ;)

toyokazu commented 6 years ago

It seems that you could not fix your problem, right? Anyway, I'd like to merge the branch and then try to fix the remaining issues.

toyokazu commented 6 years ago

Sorry for the lack of explanation. I've just merged your pull request. And the root cause of the issue written in this thread is that MQTT::ProtocolException raised by @read_thread in ruby-mqtt is not properly caught by fluent-plugin. I think the current local thread version can catch it but If you still have issues to be solved (e.g. if you prefer the previous versions like 0.4.0 which uses thread APIs of fluentd), could you submit it again as a new issue?