uken / fluent-plugin-elasticsearch

Apache License 2.0
890 stars 310 forks source link

Can't use a custom time field when inserting into a data stream #969

Open rjbma opened 2 years ago

rjbma commented 2 years ago

(check apply)

Problem

The time_key configuration parameter doesn't seem to be working with the elasticsearch_data_stream plugin. ...

Steps to replicate

<source>
    @type kafka
    brokers ...
    format json
    topics MY_REQS
    @label @MY_REQS
</source>

<label @MY_REQS>
    <match **>
        @type copy
        <store>
            @type elasticsearch_data_stream
            data_stream_name access_logs
            host elasticsearch
            port 9200
            time_key REQ_START
            include_timestamp true
            include_tag_key true
        </store>
    </match>
</label>

Expected Behavior or What you need to ask

The source data has a field named REQ_START containing dates. Since I'm using the time_key parameter, I was expecting the @timestamp field to be filled from REQ_START. However, it contains the date when the document was created in elasticsearch instead.

This is working correctly when the output is an index or index alias (i.e., when using @type elasticsearch instead).

This may be related to #967, but doesn't seem like the same issue. ...

Using Fluentd and ES plugin versions

*** LOCAL GEMS ***

async (1.30.1)
async-http (0.54.0)
async-io (1.33.0)
async-pool (0.3.9)
benchmark (default: 0.1.0)
bigdecimal (default: 2.0.0)
bundler (default: 2.1.4)
cgi (default: 0.1.0.1)
concurrent-ruby (1.1.10)
console (1.15.0)
cool.io (1.7.1)
csv (default: 3.1.2)
date (default: 3.0.3)
dbm (default: 1.1.0)
delegate (default: 0.1.0)
did_you_mean (default: 1.4.0)
digest-crc (0.6.4)
elastic-transport (8.0.0)
elasticsearch (8.1.2)
elasticsearch-api (8.1.2)
etc (default: 1.1.0)
excon (0.92.2)
ext_monitor (0.1.2)
faraday (1.10.0)
faraday-em_http (1.0.0)
faraday-em_synchrony (1.0.0)
faraday-excon (1.1.0)
faraday-httpclient (1.0.1)
faraday-multipart (1.0.3)
faraday-net_http (1.0.1)
faraday-net_http_persistent (1.2.0)
faraday-patron (1.0.0)
faraday-rack (1.0.0)
faraday-retry (1.0.3)
fcntl (default: 1.0.0)
fiber-local (1.0.0)
fiddle (default: 1.0.0)
fileutils (default: 1.4.1)
fluent-plugin-elasticsearch (5.2.2)
fluent-plugin-kafka (0.17.5)
fluentd (1.14.6)
forwardable (default: 1.3.1)
gdbm (default: 2.1.0)
getoptlong (default: 0.1.0)
http_parser.rb (0.8.0)
io-console (default: 0.5.6)
ipaddr (default: 1.2.2)
irb (default: 1.2.6)
json (2.4.1, default: 2.3.0)
logger (default: 1.4.2)
ltsv (0.1.2)
matrix (default: 0.2.0)
minitest (5.13.0)
msgpack (1.5.1)
multi_json (1.15.0)
multipart-post (2.1.1)
mutex_m (default: 0.1.0)
net-pop (default: 0.1.0)
net-smtp (default: 0.1.0)
net-telnet (0.2.0)
nio4r (2.5.8)
observer (default: 0.1.0)
oj (3.10.18)
open3 (default: 0.1.0)
openssl (default: 2.1.3)
ostruct (default: 0.2.0)
power_assert (1.1.7)
prime (default: 0.1.1)
protocol-hpack (1.4.2)
protocol-http (0.21.0)
protocol-http1 (0.13.2)
protocol-http2 (0.14.2)
pstore (default: 0.1.0)
psych (default: 3.1.0)
racc (default: 1.4.16)
rake (13.0.1)
rdoc (default: 6.2.1.1)
readline (default: 0.0.2)
readline-ext (default: 0.1.0)
reline (default: 0.1.5)
rexml (default: 3.2.3.1)
rss (default: 0.2.8)
ruby-kafka (1.4.0)
ruby2_keywords (0.0.5)
sdbm (default: 1.0.0)
serverengine (2.2.5)
sigdump (0.2.4)
singleton (default: 0.1.0)
stringio (default: 0.1.0)
strptime (0.2.5)
strscan (default: 1.0.3)
test-unit (3.3.4)
timeout (default: 0.1.0)
timers (4.3.3)
tracer (default: 0.1.0)
tzinfo (2.0.4)
tzinfo-data (1.2022.1)
uri (default: 0.10.0)
webrick (default: 1.6.1)
xmlrpc (0.3.0)
yajl-ruby (1.4.2)
yaml (default: 0.1.0)
zlib (default: 1.1.0)
mvandijk1001 commented 2 years ago

I just came across this same issue. The problem is here:

https://github.com/uken/fluent-plugin-elasticsearch/blob/728329b22143e921bb4f1f680daedda61412f403/lib/fluent/plugin/out_elasticsearch_data_stream.rb#L251

Compare that to how the index output determines the time to use:

https://github.com/uken/fluent-plugin-elasticsearch/blob/728329b22143e921bb4f1f680daedda61412f403/lib/fluent/plugin/out_elasticsearch.rb#L940-L952

The fix would be to refactor this logic out and allow both to call it.

Also in that second snippet, the precedence is @timestamp > time_key > msgpack time (I think that means the message time according to kafka). I think it makes more sense to be time_key > @timestamp > msgpack (most explicit to least).

mafazely commented 1 year ago

Hi I have the same problem. Is there any solution ?