Closed leveryd closed 1 year ago
input { stdin {} } filter { csv { columns => ["host"] } } output { elasticsearch { hosts => ["10.233.54.156:9200"] index => "subdomain" document_id => "%{host}" scripted_upsert => true action => "update" script_lang => "painless" script_type => "inline" script => " if(ctx.op == 'create') { ctx._source=params.event; ctx._source.first_create_time = params.event.get('@timestamp'); } else { String old = ctx._source.get('first_create_time'); ctx._source = params.event; ctx._source.last_update_time = params.event.get('@timestamp'); ctx._source.first_create_time = old; } " } }
对应的请求是
POST /_bulk HTTP/1.1 Connection: Keep-Alive Content-Type: application/json Content-Length: 673 Host: 10.233.54.156:9200 User-Agent: Logstash/7.17.3 (OS=Linux-4.18.0-408.el8.x86_64-amd64; JVM=Eclipse Adoptium-11.0.14.1) logstash-output-elasticsearch/11.4.1 Accept-Encoding: gzip,deflate {"update":{"_id":"www.91.com","_index":"subdomain","routing":null,"retry_on_conflict":1}} {"script":{"params":{"event":{"host":"www.91.com","message":"www.91.com","@timestamp":"2023-03-29T00:49:44.764Z","@version":"1"}},"inline":"\n if(ctx.op == 'create') {\n ctx._source=params.event;\n ctx._source.first_create_time = params.event.get('@timestamp');\n } else {\n String old = ctx._source.get('first_create_time');\n ctx._source = params.event;\n ctx._source.last_update_time = params.event.get('@timestamp');\n ctx._source.first_create_time = old;\n }\n ","lang":"painless"},"scripted_upsert":true,"upsert":{}}
此时,elasticsearch并不会执行pipeline。因为解析域名是靠pipeline实现的,所以现在会失效。
参考 Elasticsearch Not Using Pipeline In Bulk Updates
logstash elasticsearch output plugin 有 pipeline参数 ,但是测试发现没有生效。
查看 Bulk API ignores ingest pipeline for bulk update items 这个issue和里面的pr,7.17.5应该是修复了这个bug,但asm实例目前集成的是7.17.3版本,已经是 chart 7.x版本最新版了。
背景
对应的请求是
此时,elasticsearch并不会执行pipeline。因为解析域名是靠pipeline实现的,所以现在会失效。
参考 Elasticsearch Not Using Pipeline In Bulk Updates