toyokazu / fluent-plugin-mqtt-io

Fluent plugin for MQTT Input/Output
Apache License 2.0
17 stars 12 forks source link

Fluentd Input Plugin is not working!! #4

Closed sankujain closed 6 years ago

sankujain commented 6 years ago

Hello Toyokazu,

We are trying to access mqtt-input plugin but it is not giving any response. This is our input plugin:

<source>
  @type mqtt
  host 127.0.0.1
  port 1883
  topic sankusan
  <parse>
    @type json
  </parse>
 @label @sanku
</source>

And we want to write data in output file. Our output plugin is :

<label @sanku>
  <match **>
   @type file
   path /var/log/td-agent/mqtt
 </match>
</label>

We are using td-agent 0.14 and fluent-plugin-mqtt-io 0.3.x Please let us know what we are missing.

toyokazu commented 6 years ago

Hi sankujain,

Since I'm not an expert of td-agent, please ask details to the td-agent community ;) By the way, I tried your configuration a bit in my environment.

Ubuntu 16.04 td-agent3 (14.16) (installed by following the instructions here, https://docs.fluentd.org/v0.14/articles/install-by-deb) fluent-plugin-mqtt-io 0.3.7 (installed by using td-agent-gem, sudo td-agent-gem install fluent-plugin-mqtt-io)

sudo vim /etc/td-agent/td-agent.conf
<source>
  @type mqtt
  host 127.0.0.1
  port 1883
  topic sankusan
  <parse>
    @type json
  </parse>
 @label @sanku
</source>

#<label @sanku>
#  <match **>
#   @type stdout
# </match>
#</label>

<label @sanku>
  <match **>
   @type file
   path /var/log/td-agent/mqtt
 </match>
</label>

Using mosquitto as a MQTT broker.

sudo apt install mosquitto mosquitto-clients

With doing a simple test, it seems to work well.

sudo /etc/init.d/td-agent start
mosquitto_pub -t sankusan -m '{"hoge": "foo", "boe":  "boo"}'
mosquitto_pub -t sankusan -m '{"hoge": "foo", "boe":  "boo"}'
cat /var/log/td-agent/mqtt/buffer.b55f011813424c30bac9e252405211a33.log
2017-11-28T10:39:47+09:00   sankusan    {"hoge":"foo","boe":"boo"}
2017-11-28T10:39:48+09:00   sankusan    {"hoge":"foo","boe":"boo"}

I'm not sure what causes the problem in your environment. I wish that your problem will be solved soon.

Best Regards

sankujain commented 6 years ago

Thankyou Toyokazu for your instant reply. Above configuration is also working on our environment. We were trying to test the process if mosquitto server goes down, will it restart automatically or not.

Also can we get raw string as input, if we publish raw data. mosquitto_pub -t sankusan -m 'sankujain' I have successfully got this with regexp.

Regards

toyokazu commented 6 years ago

Hi sankujain,

OK. That's nice.

About mosquitto, does your question mean whether mosquitto has auto restart function with unintentional suspension or not? If so, I think that it does not have. However, for example, Linux provides several method to restart your daemons with their service management frameworks, e.g. upstart, systemd and so on. If you have an interest, please refer the following page as an example.

https://www.digitalocean.com/community/tutorials/how-to-configure-a-linux-service-to-start-automatically-after-a-crash-or-reboot-part-1-practical-examples

If you intend whether fluent-plugin-mqtt-io has auto reconnection function or not, the answer is yes. It has auto reconnection function. After your fluentd connected to mosquitto, please try to stop mosquitto.

sudo /etc/init.d/mosquitto stop

And then check /var/log/td-agent/td-agent.log, you can find the logs as the followings:

2017-11-28 15:26:10 +0900 [error]: #0 Protocol error occurs.,MQTT::ProtocolException,Failed to read byte from socket
2017-11-28 15:26:10 +0900 [error]: #0 Retry in 1 sec
2017-11-28 15:26:11 +0900 [error]: #0 System call error occurs.,Errno::ECONNREFUSED,Connection refused - connect(2) for "127.0.0.1" port 1883
2017-11-28 15:26:11 +0900 [error]: #0 Retry in 2 sec
2017-11-28 15:26:13 +0900 [error]: #0 System call error occurs.,Errno::ECONNREFUSED,Connection refused - connect(2) for "127.0.0.1" port 1883
2017-11-28 15:26:13 +0900 [error]: #0 Retry in 4 sec

During that error occurs, you cannot send a message via fluentd. When you restart mosquitto, you can send your messages again. The retry interval of reconnection can be configurable by the parameters. Please see the code:

https://github.com/toyokazu/fluent-plugin-mqtt-io/blob/master/lib/fluent/plugin/mqtt_proxy.rb

initial_interval and retry_inc_ratio are the parameters.

About the raw data, fluentd provides us a parser plugin 'none'. Please try the followings:

<source>
  @type mqtt
  host 127.0.0.1
  port 1883
  topic sankusan
  <parse>
    @type none
  </parse>
 @label @sanku
</source>

The page below shows the list of the parser plugins you can use.

https://docs.fluentd.org/v0.14/articles/parser-plugin-overview

If you also want to output raw data, you can use single_value formatter.

<label @sanku>
  <match **>
   @type file
   path /var/log/td-agent/mqtt
   <format>
     @type single_value
   </format>
 </match>
</label>

The rest of formatter plugins can be refered in the page below.

https://docs.fluentd.org/v0.14/articles/formatter-plugin-overview

Best

sankujain commented 6 years ago

Thankyou Toyokazu for your help.