Closed shohelk closed 6 years ago
Dear shohelk,
Thank you for your information. It seems that missing input format (parser configuration) causes an error. In your example configuration does not include
<source>
@type mqtt
</source>
But the actual configuration includes it right? If so, could you please try the following instead?
<source>
@type mqtt
<parse>
@type json
</parse>
The format type (json) should be changed as you want. About the supported format, please refer fluentd documents.
If you also want to use mqtt output, please also add <format>
to your configuration.
<match debug.**>
@type mqtt
host 127.0.0.1
port 1883
<format>
@type json
</format>
</match>
Please try them for now.
There may be a way to skip default <parse>
or <format>
configuration. I'll check it when I can make a time.
Thanks Toyokazu for a prompt reply. Will surely test this and let you know.
We were able to get past that problem with your solution. Thanks!
But now stuck at this problem when trying to use mqtt_buf:
2017-07-29 07:05:35 +0000 [error]: dry run failed: Unknown output plugin 'mqtt_buf'. Run 'gem search -rd fluent-plugin' to find plugins
This is our td-agent.conf:
<match debug1.**>
@type mqtt_buf
host iot.eclipse.org
port 1883
topic sanku_san
topic_rewrite_pattern '^([\w\/]+)$'
topic_rewrite_replacement 'sanku_san'
buffer_type memory
flush_interval 1s
<format>
@type json
</format>
</match>
Please let us know if we are doing something wrong, we are very close to making it work.
Sorry. mqtt_buf is not supported in fluent-plugin-mqtt-io 0.3.x.
Buffer function is planned to be migrated to <buffer>
configuration in fluentd 0.14.x. But currently, it is not supported yet. Please wait for a while.
If it is necessary to use buffer function in your application, please use fluentd 0.12.x and fluent-plugin-mqtt-io 0.2.3.
Best
Great that works! Will test with that version and let you know.
Really appreciate your prompt support.
Hello Toyokazu,
We have been trying this plugin with td-agent2 and version 0.2.3. It is successfully working while publishing the message to mqtt-server(iot.eclipse.org). It is also getting buffered in buffered path successfully and data is been flushed by flush_interval.
But, while everything is working fine if we disconnect the internet connectivity and again connecting it to check the buffer functionality for fluctuating internet connectivity, new data is been only buffered. Not flushing after the connection established again.
We manually need to restart the td-agent service for flushing the buffer.
This is the mqtt_buf plugin which we are using ::
<match debug1.**>
@type mqtt_buf
host iot.eclipse.org
port 1883
topic sanku_san
topic_rewrite_pattern '^([\w\/]+)$'
topic_rewrite_replacement 'sanku_san'
buffer_type file
buffer_path /var/log/td-agent/buffer/mqtt
flush_interval 10s
<format>
@type json
</format>
</match>
This is the td-agent.log file ::
2017-08-01 15:23:44 +0530 [info]: #0 starting fluentd worker pid=15817 ppid=15812 worker=0
2017-08-01 15:23:45 +0530 [info]: #0 listening dRuby uri="druby://127.0.0.1:24230" object="Fluent::Engine"
2017-08-01 15:23:45 +0530 [info]: #0 listening port port=24224 bind="0.0.0.0"
2017-08-01 15:23:45 +0530 [info]: #0 fluentd worker is now running worker=0
2017-08-01 15:25:24 +0530 [error]: #0 Protocol error occurs.,MQTT::ProtocolException,No Ping Response received for 23 seconds
2017-08-01 15:25:24 +0530 [error]: #0 Retry in 1 sec
2017-08-01 15:25:25 +0530 [error]: #0 The other error occurs.,SocketError,getaddrinfo: Temporary failure in name resolution
2017-08-01 15:25:25 +0530 [error]: #0 Retry in 2 sec
2017-08-01 15:25:27 +0530 [error]: #0 The other error occurs.,SocketError,getaddrinfo: Temporary failure in name resolution
2017-08-01 15:25:27 +0530 [error]: #0 Retry in 4 sec
2017-08-01 15:25:31 +0530 [error]: #0 The other error occurs.,SocketError,getaddrinfo: Temporary failure in name resolution
2017-08-01 15:25:31 +0530 [error]: #0 Retry in 8 sec
2017-08-01 15:26:04 +0530 [error]: #0 Protocol error occurs.,MQTT::ProtocolException,No Ping Response received for 23 seconds
2017-08-01 15:26:04 +0530 [error]: #0 Retry in 1 sec
2017-08-01 15:26:05 +0530 [error]: #0 The other error occurs.,SocketError,getaddrinfo: Temporary failure in name resolution
2017-08-01 15:26:05 +0530 [error]: #0 Retry in 2 sec
2017-08-01 15:26:07 +0530 [error]: #0 The other error occurs.,SocketError,getaddrinfo: Temporary failure in name resolution
2017-08-01 15:26:07 +0530 [error]: #0 Retry in 4 sec
As in log, we can notice after reconnecting, retry process stops but it does not triggering the previously buffered data to flush out. Instead, the current incoming data is also only buffered and not flushing out automatically.
Please help us in solving this problem!!
Dear sankujain,
Sorry for being late. I've finished migration of fluent-plugin-mqtt-io from 0.12 to 0.14. So, now you can try to use that plugin with fluentd 0.14. It is not a stable version but it supports thread management in framework level and compatibility with the older version during v1 phase. The reconnection code of fluent-plugin-mqtt-io is rewritten to fit the new version of fluentd. I think that the reconnection process of the newer version of this plugin will work in the disconnection situation as you stated. Would you try it in your environment?
Best Regards
Hello Toyokazu,
Thank you for your valuable response. We will try it now in our environment, just need to make sure the environment details as,
Fluentd 0.14 & fluent-plugin-mqtt-io 0.2.3
or
Fluentd 0.14 & fluent-plugin-mqtt-io 0.3.x
which one is comaptible now?
Please try the latest one, 0.3.5. The version I tested is the following:
fluentd 0.14.20 & fluent-plugin-mqtt-io 0.3.5
Best
Thank you!!
Will surely test and let you know.
Hello Toyokazu,
we have implemented fluent-plugin-mqtt-io 0.3.5 with fluentd 0.14, and it is working fine with basic functionality of buffer. But when we are using buffer_type as file and giving buffer_path, we are not able to find buffered file on specified location, though it is giving right output at mqtt_sub after flush_interval.
One more thing we noticed, after putting send_time as true, we still not getting timestamp on output screen.
This is the plugin which we are using::
<match topic.**>
@type mqtt
host "iot.eclipse.org"
port 1883
topic sanku_san
<format>
@type json
</format>
topic_rewrite_pattern '^([\w\/]+)$'
topic_rewrite_replacement 'sanku_san'
# You can specify Buffer Plugin options
retry_inc_ratio 2
retry_forever true
keep_alive 5s
send_time true
<buffer>
buffer_type file
buffer_path /var/log/td-agent/buffer/mqtt_san
flush_interval 30s
</buffer>
</match>
Also output screen is ::
sanku@sanku:~$ mosquitto_sub -h iot.eclipse.org -t sanku_san
{"json":"mage"}
{"json":"mage"}
Can you please guide us how to locate the bufferd file and then we can test the full functionality of buffer.
Thank you for your quick response!
About <buffer>
section, please refer the v0.14 document. The description was changed in fluentd v0.14.
https://docs.fluentd.org/v0.14/articles/buffer-plugin-overview
<buffer>
@type file
path ...
</buffer>
About send_time and recv_time, the implementation is not properly migrated. I fixed those points. send_time and recv_time should be included into <monitor>
section. I also added descriptions of <monitor>
section into README.md. Please refer it.
Best Regards
Thank you!!
It is working well as per we needed. :+1:
Good! Thank you for your feedback! It really helps me to improve the plugin functions 👍
Best
We are trying to use this plugin for streaming data to MQTT broker, but after installing and configuring correctly receiving this error when starting Fluentd:
We tried configuring this plugin with: td-agent2 (0.12) and td-agent3 (0.14) on Ubuntu 14 and 16 both.
Here is our fluentd.conf file:
Please help us understand if we are doing something wrong.