Closed ricsanfre closed 1 year ago
Fluentd-elasticsearch plugin supports the creation of index templates and ILM policies associated to them during the process of creating a new index in ES.
See FAQ - Enable ILM
In order to enable ILM in fluend-elasticsearch-plugin, elasticsearch-xpack
gem need to be installed. Fluentd-aggregator docker image need to be updated:
ARG BASE_IMAGE=fluent/fluentd:v1.15.3-debian-1.2
FROM $BASE_IMAGE
# UPDATE BASE IMAGE WITH PLUGINS
# Use root account to use apk
USER root
RUN buildDeps="sudo make gcc g++ libc-dev" \
&& apt-get update \
&& apt-get install -y --no-install-recommends $buildDeps \
&& sudo gem install fluent-plugin-elasticsearch \
&& sudo gem install elasticsearch-xpack \
&& sudo gem install fluent-plugin-prometheus \
&& sudo gem install fluent-plugin-record-modifier \
&& sudo gem install fluent-plugin-grafana-loki \
&& sudo gem sources --clear-all \
&& SUDO_FORCE_REMOVE=yes \
apt-get purge -y --auto-remove \
-o APT::AutoRemove::RecommendsImportant=false \
$buildDeps \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.ge
# COPY AGGREGATOR CONF FILES
COPY ./conf/fluent.conf /fluentd/etc/
COPY ./conf/forwarder.conf /fluentd/etc/
COPY ./conf/prometheus.conf /fluentd/etc/
# COPY entry
COPY entrypoint.sh /fluentd/entrypoint.sh
# Environment variables
ENV FLUENTD_OPT=""
# Run as fluent user. Do not need to have privileges to access /var/log directory
USER fluent
ENTRYPOINT ["tini", "--", "/fluentd/entrypoint.sh"]
CMD ["fluentd"]
When building this new docker image the following warning appear:
WARNING: This library is deprecated The API endpoints currently living in elasticsearch-xpack will be moved into elasticsearch-api in version 8.0.0 and forward. You should be able to keep using elasticsearch-xpack and the xpack namespace in 7.x. We're running the same tests in elasticsearch-xpack, but if you encounter any problems, please let us know in this issue: https://github.com/elastic/elasticsearch-ruby/issues/1274
Currently the plugin does not support elasticsearch-api
and elasticsearh-xpack
need to be used. See https://github.com/uken/fluent-plugin-elasticsearch/issues/937
Following the example of configuring the plugin using ILM fixed index names, not using logstash format and not creating a new index per day, and dynamic index template configuration
<match **>
@type elasticsearch
@id out_es
@log_level info
include_tag_key true
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme http
user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
# Reload and reconnect options
reload_connections false
reconnect_on_error true
reload_on_failure true
request_timeout 15s
log_es_400_reason true
# avoid 7.x errors
suppress_type_name true
# setting sniffer class
sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer
# Setting index_name
logstash_format false
# setting index_name
index_name fluentd
# specifying time key
time_key time
# including @timestamp field
include_timestamp true
# ILM Settings - WITH ROLLOVER support
# https://github.com/uken/fluent-plugin-elasticsearch/blob/master/README.Troubleshooting.md#enable-index-lifecycle-management
# rollover_index true
application_name "fluentd"
index_date_pattern ""
enable_ilm true
ilm_policy_id fluentd
ilm_policy {"policy":{"phases":{"hot":{"min_age":"0ms","actions":{"rollover":{"max_age":"3d","max_size":"20gb"},"set_priority":{"priority":100}}},"warm":{"actions":{"allocate":{"include":{},"exclude":{},"require":{"data":"warm"}},"set_priority":{"priority":50}}},"delete":{"min_age":"90d","actions":{"delete":{}}}}}}
ilm_policy_overwrite true
# index template
use_legacy_template false
template_overwrite true
template_name fluentd
template_file "/etc/fluent/template/fluentd-es-template.json"
customize_template {"<<shard>>": "1","<<replica>>": "0"}
<buffer>
flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
retry_forever true
</buffer>
</match>
Where fluentd-es-template.json
file contains index template definition,
{
"index_patterns": ["mock"],
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "mock",
"rollover_alias": "mock"
},
"number_of_shards": "<<shard>>",
"number_of_replicas": "<<replica>>"
}
}
}
}
fluentd-elasticsearh-plugin replaces
index_patterns
andindex.template.settings.index.lifecycle
by attributes specified in configuration (ilm_policy, index_name, etc.) . That is the reason of "mock" word in the template. It will be replaced in run-time.
In Kubernetes environment the file containing the index template definition can be stored in a config-map mounted as a volume in the fluentd pod
Create config map containing index template definition
# ES index template for fluentd logs
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-template
namespace: logging
data:
fluentd-es-template.json: |-
{
"index_patterns": ["mock"],
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "mock",
"rollover_alias": "mock"
},
"number_of_shards": "<<shard>>",
"number_of_replicas": "<<replica>>"
}
}
}
}
volumes:
...
- name: etcfluentd-template
configMap:
name: fluentd-template
defaultMode: 0777
volumeMounts:
...
- name: etcfluentd-template
mountPath: /etc/fluent/template
When deploying fluentd with the new configuration, using the modified fluentd docker image, index template and index are created but the associated ILM policy is not created, even when the syntax in the policy is OK
fluentd error log is the following:
2023-03-11 12:59:01 +0000 [info]: #0 [out_es] Installing ILM policy: {"policy"=>{"phases"=>{"hot"=>{"min_age"=>"0ms", "actions"=>{"rollover"=>{"max_age"=>"3d", "max_size"=>"20gb"}, "set_priority"=>{"priority"=>100}}}, "warm"=>{"actions"=>{"allocate"=>{"include"=>{}, "exclude"=>{}, "require"=>{"data"=>"warm"}}, "set_priority"=>{"priority"=>50}}}, "delete"=>{"min_age"=>"90d", "actions"=>{"delete"=>{}}}}}}
2023-03-11 12:59:03 +0000 [warn]: #0 [out_es] Could not communicate to Elasticsearch, resetting connection and trying again. [400] {"error":{"root_cause":[{"type":"x_content_parse_exception","reason":"[1:12] [policy] unknown field [phases]"}],"type":"x_content_parse_exception","reason":"[1:12] [policy] unknown field [phases]"},"status":400}
Using Kibana development UI, the policy can be created without issues.
It seems that the current version of the plugin does not support properly ILM in ES 8.x, since it is using a deprecated gem: elasticsearch-xpack.
fluentd-kubernetes-daemonset
(https://github.com/fluent/fluentd-kubernetes-daemonset) docker image, which is the one installed by default by the fluentd helm chart, does not have yet a version for 8.x. Docker images available are just tagged as 7.0, and it seems that this docker images built initially for ES 7.x are working with ES 8.x. See https://github.com/fluent/fluentd-kubernetes-daemonset/issues/1373
Latest docker image available, containing elasticsearch plugins (v1.15/debian-elasticsearch7) uses a previous version of fluentd-elasticsearch-plugin and its dependencies). See Gemfile used in Dockerfile to install all plugins and its dependencies:
gem "fluentd", "1.15.3"
...
gem "elasticsearch", "~> 7.0"
gem "fluent-plugin-elasticsearch", "~> 5.1.1"
gem "elasticsearch-xpack", "~> 7.0"
The docker image is installing the following gems:
Modifiying fluentd-aggregator docker image to use release 5.1.1 of the plugin and elasticsearch 7 dependencies, solves the issue.
New Dockerfile:
ARG BASE_IMAGE=fluent/fluentd:v1.15.3-debian-1.2
FROM $BASE_IMAGE
# UPDATE BASE IMAGE WITH PLUGINS
# Use root account to use apk
USER root
RUN buildDeps="sudo make gcc g++ libc-dev" \
&& apt-get update \
&& apt-get install -y --no-install-recommends $buildDeps \
&& sudo gem install elasticsearch -v '~> 7.0' \
&& sudo gem install fluent-plugin-elasticsearch -v '~> 5.1.1' \
&& sudo gem install elasticsearch-xpack -v '~> 7.0' \
&& sudo gem install fluent-plugin-prometheus \
&& sudo gem install fluent-plugin-record-modifier \
&& sudo gem install fluent-plugin-grafana-loki \
&& sudo gem sources --clear-all \
&& SUDO_FORCE_REMOVE=yes \
apt-get purge -y --auto-remove \
-o APT::AutoRemove::RecommendsImportant=false \
$buildDeps \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.ge
# COPY AGGREGATOR CONF FILES
COPY ./conf/fluent.conf /fluentd/etc/
COPY ./conf/forwarder.conf /fluentd/etc/
COPY ./conf/prometheus.conf /fluentd/etc/
# COPY entry
COPY entrypoint.sh /fluentd/entrypoint.sh
# Environment variables
ENV FLUENTD_OPT=""
# Run as fluent user. Do not need to have privileges to access /var/log directory
USER fluent
ENTRYPOINT ["tini", "--", "/fluentd/entrypoint.sh"]
CMD ["fluentd"]
As additional configuration I am trying to create separate index for different containers/app. Each index will have their own ES mapping and their specific index template. This would be an alternative solution to issue #58, avoiding the conflicts of data types ingesting data into ES when using fluentbit's kubernetes filter Merge_Log option.
Dynamic index and dynamic templates can be configured in fluentd-elasticsearch-plugin making use of template_customize
option.
With the following configuration a separate index will be generated for each tuple (namespace, container), using a common ILM policy and setting automatic rollover.
ConfigMap containing dynamic index template
# ES index template for fluentd logs
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-template
namespace: logging
data:
fluentd-es-template.json: |-
{
"index_patterns": ["fluentd-<<TAG>>-*"],
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "fluentd-policy",
"rollover_alias": "fluentd-<<TAG>>"
},
"number_of_shards": "<<shard>>",
"number_of_replicas": "<<replica>>"
}
},
"mappings" : {
"dynamic_templates" : [
{
"message_field" : {
"path_match" : "message",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text",
"norms" : false
}
}
},
{
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : {
"type" : "text", "norms" : false,
"fields" : {
"keyword" : { "type": "keyword", "ignore_above": 256 }
}
}
}
} ],
"properties" : {
"@timestamp": { "type": "date" }
}
}
}
}
Modify Fluentd config
# Route label ES Output
<label @OUTPUT_ES>
# Setup index name index based on namespace and container
<filter kube.**>
@type record_transformer
enable_ruby
<record>
index_app_name ${record['namespace'] + '.' + record['container']}
</record>
</filter>
<filter host.**>
@type record_transformer
enable_ruby
<record>
index_app_name "host"
</record>
</filter>
# Send received logs to elasticsearch
<match **>
@type elasticsearch
@id out_es
@log_level info
include_tag_key true
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme http
user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
# Reload and reconnect options
reload_connections false
reconnect_on_error true
reload_on_failure true
request_timeout 15s
log_es_400_reason true
# avoid 7.x errors
suppress_type_name true
# setting sniffer class
sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer
# Do not use logstash format
logstash_format false
# Setting index_name
index_name fluentd-${index_app_name}
# specifying time key
time_key time
# including @timestamp field
include_timestamp true
# Customizing index template
use_legacy_template false
template_overwrite true
template_name fluentd-${index_app_name}
template_file "/etc/fluent/template/fluentd-es-template.json"
customize_template {"<<shard>>": "1","<<replica>>": "0", "<<TAG>>":"${index_app_name}"}
# ILM Settings - WITH ROLLOVER support
# https://github.com/uken/fluent-plugin-elasticsearch/blob/master/README.Troubleshooting.md#enable-index-lifecycle-management
index_date_pattern ""
enable_ilm true
ilm_policy_id fluentd-policy
ilm_policy {"policy":{"phases":{"hot":{"min_age":"0ms","actions":{"rollover":{"max_size":"10gb","max_age":"7d"}}},"warm":{"min_age":"2d","actions":{"shrink":{"number_of_shards":1},"forcemerge":{"max_num_segments":1}}},"delete":{"min_age":"7d","actions":{"delete":{"delete_searchable_snapshot":true}}}}}}
ilm_policy_overwrite true
<buffer tag, index_app_name>
flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
retry_forever true
</buffer>
</match>
Enhancement Request
1) Apply logs retention policies through the configuration of Elasticsearh Index Lifecycle Management policies.
2) Use index template for fluentd logs, so mappings can be configured (data types, indexing parameters, etc.)
Index Lifecycle Management (ILM) to automate the management of indices, and setting retention policies.
References
Implementation Details
Fluend elastic-search-plugin already support ILM and Index templates configuration: See plugin FAQ