fluent / fluent-plugin-opensearch

OpenSearch Plugin for Fluentd
Apache License 2.0
58 stars 20 forks source link

OpenSearch Output Data Stream - id_key ignored #124

Open toby181 opened 1 year ago

toby181 commented 1 year ago

Hi together, We're using the opensearch_data_stream feature. To avoid duplicated logs in OpenSearch, we want to use the feature opensearch_genid in combination with the "id_key" parameter. Unfortunately the "id_key" seems to be ignored when it comes to data streams.

Steps to replicate

Our dummy config:

<filter **>
    @type opensearch_genid
    hash_id_key "myhash"
  </filter>
  <match **>
    @type opensearch_data_stream
    @id fd_out_os_ds
    @log_level "info"
    id_key "myhash"
    with_transporter_log false
    data_stream_name "log-dummy"
    data_stream_template_name ""
    include_tag_key true
    host "opensearch-staging"
    port 9200
    path ""
    scheme https
    ssl_verify false
    ssl_version TLSv1_2
    user "fluentd"
    password xxxxxx
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    log_os_400_reason true
    logstash_format true
    include_timestamp true
    sniffer_class_name "Fluent::Plugin::OpenSearchSimpleSniffer"
    request_timeout 5s
    suppress_type_name false
    <buffer tag>
      flush_thread_count 8
      flush_interval 5s
      chunk_limit_size 4M
      total_limit_size 512MB
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>

When I check my logs in OpenSearch the field 'myhash' is filled, with hash values, so it's not a problem with the generation of the hashes.

Expected Behavior or What you need to ask

Based on the example in https://github.com/fluent/fluent-plugin-opensearch#generate-hash-id, the configuration seems ok, I'd expect that the value of the "_id" field is replaced with the value of the "myhash" field, or the configured field.

Using Fluentd and OpenSearch plugin versions

abbrev (default: 0.1.0) async (1.31.0) async-http (0.60.2) async-io (1.37.0) async-pool (0.4.0) aws-eventstream (1.2.0) aws-partitions (1.854.0) aws-sdk-core (3.187.1) aws-sigv4 (1.6.1) base64 (default: 0.1.1) benchmark (default: 0.2.0) bigdecimal (default: 3.1.1) bundler (default: 2.3.26) cgi (default: 0.3.6) concurrent-ruby (1.2.2) console (1.23.2) cool.io (1.8.0) csv (default: 3.2.5) date (default: 3.2.2) debug (1.6.3) delegate (default: 0.2.0) did_you_mean (default: 1.6.1) digest (default: 3.1.0) digest-crc (0.6.5) drb (default: 2.1.0) english (default: 0.7.1) erb (default: 2.2.3) error_highlight (default: 0.3.0) etc (default: 1.3.0) excon (0.104.0) faraday (2.7.11) faraday-excon (2.1.0) faraday-net_http (3.0.2) faraday_middleware-aws-sigv4 (1.0.1) fcntl (default: 1.0.1) fiber-annotation (0.2.0) fiber-local (1.0.0) fiddle (default: 1.1.0) fileutils (default: 1.6.0) find (default: 0.1.1) fluent-plugin-kafka (0.19.2) fluent-plugin-multi-format-parser (1.0.0) fluent-plugin-opensearch (1.1.4) fluent-plugin-prometheus (2.1.0) fluent-plugin-record-modifier (2.1.1) fluentd (1.16.3) forwardable (default: 1.3.2) getoptlong (default: 0.1.1) http_parser.rb (0.8.0) io-console (default: 0.5.11) io-nonblock (default: 0.1.0) io-wait (default: 0.2.1) ipaddr (default: 1.2.4) irb (default: 1.4.1) jmespath (1.6.2) json (2.6.3, default: 2.6.1) logger (default: 1.5.0) ltsv (0.1.2) matrix (0.4.2) minitest (5.15.0) msgpack (1.7.2) multi_json (1.15.0) mutex_m (default: 0.1.1) net-ftp (0.1.3) net-http (default: 0.3.0) net-imap (0.2.3) net-pop (0.1.1) net-protocol (default: 0.1.2) net-smtp (0.3.1) nio4r (2.5.9) nkf (default: 0.1.1) observer (default: 0.1.1) oj (3.16.1) open-uri (default: 0.2.0) open3 (default: 0.1.1) opensearch-ruby (3.0.1) openssl (default: 3.0.1) optparse (default: 0.2.0) ostruct (default: 0.5.2) pathname (default: 0.2.0) power_assert (2.0.1) pp (default: 0.3.0) prettyprint (default: 0.1.1) prime (0.1.2) prometheus-client (4.2.2) protocol-hpack (1.4.2) protocol-http (0.24.7) protocol-http1 (0.15.1) protocol-http2 (0.15.1) pstore (default: 0.1.1) psych (default: 4.0.4) racc (default: 1.6.0) rake (13.0.6) rbs (2.7.0) rdoc (default: 6.4.0) readline (default: 0.0.3) readline-ext (default: 0.1.4) reline (default: 0.3.1) resolv (default: 0.2.1) resolv-replace (default: 0.1.0) rexml (3.2.6, 3.2.5) rinda (default: 0.1.1) rss (0.2.9) ruby-kafka (1.5.0) ruby2_keywords (default: 0.0.5) securerandom (default: 0.2.0) serverengine (2.3.2) set (default: 1.0.2) shellwords (default: 0.1.0) sigdump (0.2.5) singleton (default: 0.1.1) stringio (default: 3.0.1) strptime (0.2.5) strscan (default: 3.0.1) syslog (default: 0.1.0) tempfile (default: 0.1.2) test-unit (3.5.3) time (default: 0.2.2) timeout (default: 0.2.0) timers (4.3.5) tmpdir (default: 0.1.2) traces (0.11.1) tsort (default: 0.1.0) typeprof (0.21.3) tzinfo (2.0.6) tzinfo-data (1.2023.3) un (default: 0.2.0) uri (0.12.2, default: 0.12.1) weakref (default: 0.1.1) webrick (1.8.1) yajl-ruby (1.4.3) yaml (default: 0.2.0) zlib (default: 2.1.1)

* OpenSearch version (optional)
* 2.11
* OpenSearch template(s) (optional)
{
  "name": "log_template",
  "index_template": {
    "index_patterns": [
      "log-*"
    ],
    "template": {
      "settings": {
        "index": {
          "number_of_shards": "1",
          "number_of_replicas": "1",
          "mapping": {
            "total_fields": {
              "limit": "2000"
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "timestamp": {
            "type": "date"
          }
        }
      }
    },
    "composed_of": [

    ],
    "priority": 200,
    "data_stream": {
      "timestamp_field": {
        "name": "@timestamp"
      }
    }
  }
}
toby181 commented 1 year ago

Did some further testing but instead of using the 'opensearch_data_stream', the 'opensearch' plugin is used. ... I always thought the opensearch_data_stream has to be used when it comes to data streams. Might someone can correct me or provide some further details.

My current configuration looks like this

  <filter **>
    @type opensearch_genid
    hash_id_key _hash
  </filter>
  <label @FLUENT_LOG>
    <match fluent.*>
      @type null
    </match>
  </label>
  <match **>
    @type opensearch
    @id fd_out_os
    @log_level "debug"
    with_transporter_log false
    include_tag_key true
    id_key _hash
    remove_keys _hash
    host "opensearch-staging"
    port 9200
    path ""
    scheme https
    ssl_verify false
    ssl_version TLSv1_2
    user "fluentd"
    password xxxxxx
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    log_os_400_reason true
    logstash_prefix "null"
    logstash_dateformat "null"
    logstash_format false
    index_name "log-1"
    include_timestamp true
    sniffer_class_name "Fluent::Plugin::OpenSearchSimpleSniffer"
    request_timeout 4s
    suppress_type_name false
    write_operation "create"
    <buffer tag>
      ....
    </buffer>
  </match>