Open dmnfortytwo opened 5 years ago
You shouldn't use blackhole match(<match **>
) for ES plugin.
Instead, you should use label in source and match like this:
<source>
@type forward
port 24224
bind 127.0.0.1
deny_keepalive false
@label RAW
</source>
<label @RAW>
<match **>
@type elasticsearch
host localhost
port 9200
target_type_key @target_key
type_name fluentd
target_index_key target_index
logstash_format true
logstash_prefix invalid-
time_key @timestamp
include_timestamp true
reconnect_on_error true
reload_on_failure true
reload_connections false
request_timeout 120s
<buffer>
@type file
flush_interval 10s
retry_type periodic
retry_forever true
retry_wait 10s
chunk_limit_size 16Mb
queue_limit_length 4096
path /var/lib/td-agent/buffers/output_elasticsearch
total_limit_size 15Gb
</buffer>
</match>
</label>
# For debugging
<label @ERROR>
<match **>
@type stdout
</match>
</label>
What is label @ERROR
? Shall I define it somehow or it's a magic word?
I believe this will stop snowball error growth, but won't fluentd try to resend original broken message again?
# cat /etc/td-agent/conf.d/*
<source>
@type forward
port 24224
bind 127.0.0.1
deny_keepalive false
@label RAW
</source>
<label @RAW>
<match **>
@type elasticsearch
host localhost
port 9200
target_type_key @target_key
type_name fluentd
target_index_key target_index
logstash_format true
logstash_prefix invalid-
time_key @timestamp
include_timestamp true
reconnect_on_error true
reload_on_failure true
reload_connections false
request_timeout 120s
<buffer>
@type file
flush_interval 10s
retry_type periodic
retry_forever true
retry_wait 10s
chunk_limit_size 16Mb
queue_limit_length 4096
path /var/lib/td-agent/buffers/output_elasticsearch
total_limit_size 15Gb
</buffer>
</match>
</label>
<label @ERROR>
<match **>
@type file
path /var/log/td-agent/es_errors
</match>
</label>
Nov 14 10:06:31 fqdn systemd[1]: Starting td-agent log sender...
Nov 14 10:06:41 fqdn systemd[1]: Started td-agent log sender.
Nov 14 10:06:45 fqdn ruby[31507]: /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/root_agent.rb:299:in `find_label': RAW label not found (ArgumentE
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/plugin_helper/event_emitter.rb:50:in `event_emitter_
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/plugin_helper/event_emitter.rb:74:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/plugin_helper/server.rb:312:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/plugin/in_forward.rb:95:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/plugin.rb:164:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/root_agent.rb:282:in `add_source'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/root_agent.rb:122:in `block in configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/root_agent.rb:118:in `each'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/root_agent.rb:118:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/engine.rb:131:in `configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/engine.rb:96:in `run_configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/supervisor.rb:795:in `run_configure'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/supervisor.rb:579:in `dry_run'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/supervisor.rb:597:in `supervise'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/supervisor.rb:502:in `run_supervisor'
Nov 14 10:06:45 fqdn ruby[31507]: from /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.6/lib/fluent/command/fluentd.rb:310:in `<main>'
What is label @ERROR? Shall I define it somehow or it's a magic word?
Yep, it is.
@label RAW
should be @label @RAW
.
Well.. I've adopted my configs to use it with tags and now all messages are rejected
<source>
@type syslog
port 5140
bind 127.0.0.1
tag syslog
priority_key severity
facility_key syslog_facility
@label @RAW
</source>
<source>
@type forward
port 24224
bind 127.0.0.1
deny_keepalive false
@label @RAW
</source>
<label @RAW>
<filter app.**>
@type record_transformer
enable_ruby true
<record>
fqdn harvester-staging-gce-be-1
hostname "#{Socket.gethostname}"
project foo
cluster default
team ops
original_tag ${tag}
</record>
</filter>
<filter syslog.**>
@type record_transformer
enable_ruby true
<record>
fqdn harvester-staging-gce-be-1.fqdn
hostname "#{Socket.gethostname}"
project foo
cluster default
team ops
original_tag ${tag}
</record>
</filter>
<match **>
@type elasticsearch
@log_level debug
host localhost
port 9200
target_type_key @target_key
type_name fluentd
target_index_key target_index
logstash_format true
logstash_prefix invalid-
time_key @timestamp
include_timestamp true
reconnect_on_error true
reload_on_failure true
reload_connections false
request_timeout 120s
<buffer>
@type file
flush_interval 10s
retry_type periodic
retry_forever true
retry_wait 10s
chunk_limit_size 16Mb
queue_limit_length 4096
path /var/lib/td-agent/buffers/output_elasticsearch
total_limit_size 15Gb
</buffer>
</match>
</label>
<label @ERROR>
<match **>
@type file
path /var/log/td-agent/es_errors
</match>
</label>
2018-11-15 12:35:13 +0000 [warn]: #0 send an error event to @ERROR: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'object mapping for [original_tag] tried to parse field [original_tag] as object, but found a concrete value'" location=nil tag="syslog.cron.info" time=2018-11-15 12:35:01.000000000 +0000
Sample message from es_errors:
2018-11-15T12:36:22+00:00 syslog.authpriv.info {"host":"harvester-staging-gce-be-1","ident":"CRON","pid":"3991","message":"pam_unix(cron:session): session closed for user root","severity":"info","syslog_facility":"authpriv","fqdn":"harvester-staging-gce-be-1.fqdn","hostname":"harvester-staging-gce-be-1.fqdn","project":"foo","cluster":"default","team":"ops","original_tag":"syslog.authpriv.info"}
Other hosts which write to ES work without any problems with same filters with original_tag.
Your ES index's object mapping seems to be wrong...? Could you confrim it?
How do I?
Same hosts without label
push same messages without problems. (To the same ES cluster, sure and same indices)
I cannot reproduce this issue with label
routing....
Used conf:
Fluentd Log:
Pushing Log Request:
% printf '\xFF\xAD' | bundle exec fluent-cat -f none test.fluentcat
% printf 'message: none' | bundle exec fluent-cat -f none test.fluentcat
% printf 'message: none' | bundle exec fluent-cat -f none syslog.fluentcat
% printf 'message: none' | bundle exec fluent-cat -f none app.fluentcat
Search Request:
GET _search
{
"query": {
"match_all": {}
}
}
Response:
I see logs starting from 00:00am. Looks like old index was really broken. It's last chance fallback for weird errors and invalid configurations, so production data was not affected.
I'll play with new config a bit more.
Well, there is another question. I still get invalid messages, they mostly come from webserver logs via syslog. As you understand, request uri can contain any weird symbols from user. Elasticsearch will drop whole event with erroneous content, but is there an option to make this messages valid and push them into ES?
there an option to make this messages valid and push them into ES
ES plugin does not have this option.
You can add String#scrub
operation plugin like fluent-plugin-string-scrub in your Fluentd pipeline.
I added a FAQ for a pitfall when using blackhole glob(**
) tag in pushing Elasticsearch pipeline.
https://github.com/uken/fluent-plugin-elasticsearch/pull/502
I'm waiting for https://github.com/kataring/fluent-plugin-string-scrub/issues/6 to finish tests.
Sad, but I've got no response to my issue. Could you kindly take a look on it and maybe give me a clue?
I've added a comment on https://github.com/kataring/fluent-plugin-string-scrub/issues/6. Unfortunately, I couldn't confirm that fluent-plugin-string-scrub is not working issue.
Summary: invalid data from client causes endless logs, buffers and overall load growth causing denial of service.
Config:
Versions:
How to reproduce:
Just send broken data into input_forward's socket:
printf '\xFF\xAD' | fluent-cat -f none app.fluentcat
What happens:
Fluentd tries to push message into elasticsearch and gets an error:
2018-11-13 11:16:27 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="app.fluentcat" time=2018-11-13 11:16:17.492985640 +0000 record={"message"=>"\xFF\xAD"}
Fluentd takes this error message and tries to put it into elasticsearch.. and gets error again:
2018-11-13 11:16:38 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="fluent.warn" time=2018-11-13 11:16:27.978851140 +0000 record={"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"app.fluentcat", "time"=>2018-11-13 11:16:17.492985640 +0000, "record"=>{"message"=>"\xFF\xAD"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"app.fluentcat\" time=2018-11-13 11:16:17.492985640 +0000 record={\"message\"=>\"\\xFF\\xAD\"}"}
..and again
2018-11-13 11:16:50 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="fluent.warn" time=2018-11-13 11:16:38.991800825 +0000 record={"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"fluent.warn", "time"=>2018-11-13 11:16:27.978851140 +0000, "record"=>{"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"app.fluentcat", "time"=>2018-11-13 11:16:17.492985640 +0000, "record"=>{"message"=>"\xFF\xAD"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"app.fluentcat\" time=2018-11-13 11:16:17.492985640 +0000 record={\"message\"=>\"\\xFF\\xAD\"}"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"fluent.warn\" time=2018-11-13 11:16:27.978851140 +0000 record={\"error\"=>\"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>\", \"location\"=>nil, \"tag\"=>\"app.fluentcat\", \"time\"=>2018-11-13 11:16:17.492985640 +0000, \"record\"=>{\"message\"=>\"\\xFF\\xAD\"}, \"message\"=>\"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\\\"400 - Rejected by Elasticsearch\\\" location=nil tag=\\\"app.fluentcat\\\" time=2018-11-13 11:16:17.492985640 +0000 record={\\\"message\\\"=>\\\"\\\\xFF\\\\xAD\\\"}\"}"}
..and again..
2018-11-13 11:17:01 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="fluent.warn" time=2018-11-13 11:16:50.005246142 +0000 record={"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"fluent.warn", "time"=>2018-11-13 11:16:38.991800825 +0000, "record"=>{"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"fluent.warn", "time"=>2018-11-13 11:16:27.978851140 +0000, "record"=>{"error"=>"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>", "location"=>nil, "tag"=>"app.fluentcat", "time"=>2018-11-13 11:16:17.492985640 +0000, "record"=>{"message"=>"\xFF\xAD"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"app.fluentcat\" time=2018-11-13 11:16:17.492985640 +0000 record={\"message\"=>\"\\xFF\\xAD\"}"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"fluent.warn\" time=2018-11-13 11:16:27.978851140 +0000 record={\"error\"=>\"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>\", \"location\"=>nil, \"tag\"=>\"app.fluentcat\", \"time\"=>2018-11-13 11:16:17.492985640 +0000, \"record\"=>{\"message\"=>\"\\xFF\\xAD\"}, \"message\"=>\"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\\\"400 - Rejected by Elasticsearch\\\" location=nil tag=\\\"app.fluentcat\\\" time=2018-11-13 11:16:17.492985640 +0000 record={\\\"message\\\"=>\\\"\\\\xFF\\\\xAD\\\"}\"}"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\"400 - Rejected by Elasticsearch\" location=nil tag=\"fluent.warn\" time=2018-11-13 11:16:38.991800825 +0000 record={\"error\"=>\"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>\", \"location\"=>nil, \"tag\"=>\"fluent.warn\", \"time\"=>2018-11-13 11:16:27.978851140 +0000, \"record\"=>{\"error\"=>\"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>\", \"location\"=>nil, \"tag\"=>\"app.fluentcat\", \"time\"=>2018-11-13 11:16:17.492985640 +0000, \"record\"=>{\"message\"=>\"\\xFF\\xAD\"}, \"message\"=>\"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\\\"400 - Rejected by Elasticsearch\\\" location=nil tag=\\\"app.fluentcat\\\" time=2018-11-13 11:16:17.492985640 +0000 record={\\\"message\\\"=>\\\"\\\\xFF\\\\xAD\\\"}\"}, \"message\"=>\"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\\\"400 - Rejected by Elasticsearch\\\" location=nil tag=\\\"fluent.warn\\\" time=2018-11-13 11:16:27.978851140 +0000 record={\\\"error\\\"=>\\\"#<Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError: 400 - Rejected by Elasticsearch>\\\", \\\"location\\\"=>nil, \\\"tag\\\"=>\\\"app.fluentcat\\\", \\\"time\\\"=>2018-11-13 11:16:17.492985640 +0000, \\\"record\\\"=>{\\\"message\\\"=>\\\"\\\\xFF\\\\xAD\\\"}, \\\"message\\\"=>\\\"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error=\\\\\\\"400 - Rejected by Elasticsearch\\\\\\\" location=nil tag=\\\\\\\"app.fluentcat\\\\\\\" time=2018-11-13 11:16:17.492985640 +0000 record={\\\\\\\"message\\\\\\\"=>\\\\\\\"\\\\\\\\xFF\\\\\\\\xAD\\\\\\\"}\\\"}\"}"}