uken / fluent-plugin-elasticsearch

Apache License 2.0
891 stars 310 forks source link

Question: id_key in @type elasticsearch_data_stream doesn't function as expected ... #1045

Open mafazely opened 8 months ago

mafazely commented 8 months ago

Problem

I would like to use the gen_id feature to avoid duplicate documents in Elasticsearch. Currently, I employ Fluent Bit on my servers to read and parse logs. These logs are then sent to Fluentd, acting as an aggregator, which subsequently pushes them to the Elasticsearch cluster. However, I've encountered an issue where the id_key doesn't function as expected on the elasticsearch_data_stream. This results in the addition of a _hash field to the documents in Elasticsearch, but it is not being recognized as the _id field.

Steps to replicate

<system>
  log_level info
  workers 8
</system>

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter data.**>
  @type record_transformer
  enable_ruby true
  <record>
    ...
  </record>
</filter>

<filter data.**>
  @type elasticsearch_genid
  use_record_as_seed true
  record_keys []
  use_entire_record true
  hash_type sha1
  hash_id_key _hash
  separator _
  inc_time_as_key false
  inc_tag_as_key false
</filter>

<filter stream.**>
  @type elasticsearch_genid
  use_record_as_seed true
  record_keys []
  use_entire_record true
  hash_type sha1
  hash_id_key _hash
  separator _
  inc_time_as_key false
  inc_tag_as_key false
</filter>

<match data.**>
  @type copy
  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of processed records for data streams
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
  <store>
    @type elasticsearch_data_stream
    host sample_host:9200
    scheme https
    ssl_verify false
    id_key _hash
    write_operation create
    remove_keys _hash
    user sample_user
    password sample_pass
    include_tag_key true
    include_timestamp true
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    data_stream_name data
    buffer_type memory
    retry_forever true
    overflow_action block
    <buffer>
      @type memory
      flush_thread_count 1
      flush_interval 10s
      chunk_limit_size 4M
      total_limit_size 512M
      retry_forever true
    </buffer>
  </store>
</match>

<match stream.**>
  @type copy
  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of processed records for data streams
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
  <store>
    @type elasticsearch_data_stream
    hosts sample_host:9200
    scheme https
    ssl_verify false
    id_key _hash
    write_operation create
    remove_keys _hash
    user sample_user
    password sample_pass
    include_tag_key true
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    data_stream_name stream_log
    buffer_type memory
    retry_forever true
    overflow_action block
    <buffer>
      @type memory
      flush_thread_count 1
      flush_interval 10s
      chunk_limit_size 4M
      total_limit_size 512M
      retry_forever true
    </buffer>
  </store>
</match>

Expected Behavior or What you need to ask

Use _hash as id field for checking the uniqueness of docs in elasticsearch but didn't do this.

Using Fluentd and ES plugin versions

mafazely commented 7 months ago

Any recommendations?

castorsky commented 6 months ago

Same here. id_key and remove_keys are ignored by elasticsearch_data_stream.

ElasticSearch 8.5
Fluentd 1.16.4 (container)
sturmf commented 5 months ago

I would also need this functionality. Is it true that this feature is not supported for data streams or are we missing some configuration?

And is there a technical reason for this or has just no one had the time to implement it so far?