uken / fluent-plugin-elasticsearch

Apache License 2.0
892 stars 309 forks source link

Pipeline not executed with datastreams #1046

Open Floppe opened 8 months ago

Floppe commented 8 months ago

(check apply)

Problem

When I switched to ES & Filebeat 8 which uses DataStreams and also changed type in Fluentd to elasticsearch_data_stream then all pipelines stopped to work.

  ### General match
  <match **>
    @type elasticsearch_data_stream
    data_stream_name filebeat-8.12.2
    scheme https
    host es01
    port 9200
    user elastic
    password xxxxxxx

    include_timestamp true

    verify_es_version_at_startup false
    default_elasticsearch_version 8
    suppress_type_name true
    ssl_verify false

    #pipeline ${record['@metadata']['pipeline']}
    pipeline filebeat-8.12.2-apache-access-pipeline
  </match>
</label>

Example ES document coming from Filebeat that has read an Apache access log through the module. You can see the metadata pipeline there.

{
  "_index": ".ds-filebeat-8.12.2-2024.03.19-000002",
  "_id": "KiLHWo4BW_5uurVKkxuu",
  "_version": 1,
  "_score": 0,
  "_source": {
    "input": {
      "type": "log"
    },
    "agent": {
      "name": "www5",
      "id": "c7621651-c8f7-4afb-8fb4-28bb04cdc3d9",
      "ephemeral_id": "7af1ee40-948a-4266-b5b6-9bc2f6e246dc",
      "type": "filebeat",
      "version": "8.12.2"
    },
    "@timestamp": "2024-03-20T07:32:36.477Z",
    "ecs": {
      "version": "1.12.0"
    },
    "log": {
      "file": {
        "path": "/var/www/domain.com/logs/ssl_access.log"
      },
      "offset": 177811762
    },
    "@metadata": {
      "pipeline": "filebeat-8.12.2-apache-access-pipeline",
      "beat": "filebeat",
      "type": "_doc",
      "version": "8.12.2"
    },
    "service": {
      "type": "apache"
    },
    "event": {
      "ingested": "2024-03-20T07:33:32.717454276Z",
      "module": "apache",
      "dataset": "apache.access"
    },
    .....
  }
}

...

Expected Behavior or What you need to ask

That ES should use the pipeline supplied. Both if supplied in config file and also from @metadata.pipeline that Filebeat ships. ...

Using Fluentd and ES plugin versions

kacian commented 3 months ago

You can use a workaround to write to a datastream with the type set to Elasticsearch, ensuring the pipeline functions correctly:

<match **>
  @type elasticsearch
  scheme https
  host es01
  port 9200
  user elastic
  password xxxxxxx
  include_timestamp true
  verify_es_version_at_startup false
  default_elasticsearch_version 8
  suppress_type_name true
  ssl_verify false

  # Use index_name instead of data_stream_name
  index_name filebeat-8.12.2

  # Datastreams can only handle 'create' operations, so we set this to write to the datastream
  write_operation create

  # The pipeline is now functioning correctly
  pipeline filebeat-8.12.2-apache-access-pipeline
</match>
Floppe commented 3 months ago

I could not get it to work. Debugging says: Dropping record because its missing an '_id' field and write_operation is create