BrunoBonacci / mulog

μ/log is a micro-logging library that logs events and data, not words!
https://cljdoc.org/d/com.brunobonacci/mulog/
Apache License 2.0
490 stars 48 forks source link

Cascading errors in elasticsearch publisher when using multi publisher #92

Closed handerpeder closed 2 years ago

handerpeder commented 2 years ago

Continuation of issue originally reported as #91

Repro:

  1. Start a multi publisher

    (u/start-publisher!
    {:type :multi
    :publishers [{:type        :elasticsearch
                :data-stream "my-stream"
                :url         "https://my.elk"
                :http-opts   opts}
               {:type        :console}]})
  2. Post a bad record

    (u/log :my/event :val2 [1 2 3])
    (u/log :my/event :val2 ["some" "text"]) ;; mulog/publisher-error: failed to parse field [val2.a] of type [long]

Expected: 1 publisher error for the bad record Actual: a cascade of errors which eventually brings mulog down.


{:publisher-type :elasticsearch, :mulog/namespace "clojure.core", :publisher-id "4jzFz5OKnOsC00M4TcskJccIJs_C7ItD", :mulog/action :publish, :mulog/timestamp 1657273030686, :exception #error {  :cause "Elasticsearch Bulk API reported errors"  :data {:errors ({:create {:_index "logs-test", :_id "4jzG-WmGzvfDvJt7SB6slj3raDjtjLzS", :status 400, :error {:type "mapper_parsing_exception", :reason "failed to parse field [val2.a] of type [long] in document with id '4jzG-WmGzvfDvJt7SB6slj3raDjtjLzS'. Preview of field's value: 'some'", :caused_by {:type "illegal_argument_exception", :reason "For input string: \"some\""}}}} {:create {:_index "logs-test", :_id "4jzG-ZFNXwV0VofVY3DqlNNKl5JSrdcv", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG-ZFNXwV0VofVY3DqlNNKl5JSrdcv]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG-r4U-r2-q8CI_5-Jx24fQzafEA1y", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG-r4U-r2-q8CI_5-Jx24fQzafEA1y]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG08Y2pWosLNJgJQy1ORsjQVGfrlpp", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG08Y2pWosLNJgJQy1ORsjQVGfrlpp]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0R8kxONgyOPYQZyU2AXs2VouFSX1", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0R8kxONgyOPYQZyU2AXs2VouFSX1]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0Rj621FKEVVs4Q_EHZ-IXM9YSt7W", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0Rj621FKEVVs4Q_EHZ-IXM9YSt7W]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0aHtixv-koZmyVRcErXPe6KHQ2Om", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0aHtixv-koZmyVRcErXPe6KHQ2Om]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0inyJ4hBzCXTmu-4xBnlo8ZMUVIT", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0inyJ4hBzCXTmu-4xBnlo8ZMUVIT]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG10cEQEcPjbVZyA_oUIMRn9WPzzXe", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG10cEQEcPjbVZyA_oUIMRn9WPzzXe]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG1JEzyHKYzd0tFpbe3q6H8zGZLEfX", :_version 1, :result "created", :_shards {:total 2, :successful 2, :failed 0}, :_seq_no 149, :_primary_term 1, :status 201}})}  :via  [{:type clojure.lang.ExceptionInfo    :message "Elasticsearch Bulk API reported errors"    :data {:errors ({:create {:_index "logs-test", :_id "4jzG-WmGzvfDvJt7SB6slj3raDjtjLzS", :status 400, :error {:type "mapper_parsing_exception", :reason "failed to parse field [val2.a] of type [long] in document with id '4jzG-WmGzvfDvJt7SB6slj3raDjtjLzS'. Preview of field's value: 'some'", :caused_by {:type "illegal_argument_exception", :reason "For input string: \"some\""}}}} {:create {:_index "logs-test", :_id "4jzG-ZFNXwV0VofVY3DqlNNKl5JSrdcv", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG-ZFNXwV0VofVY3DqlNNKl5JSrdcv]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG-r4U-r2-q8CI_5-Jx24fQzafEA1y", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG-r4U-r2-q8CI_5-Jx24fQzafEA1y]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG08Y2pWosLNJgJQy1ORsjQVGfrlpp", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG08Y2pWosLNJgJQy1ORsjQVGfrlpp]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0R8kxONgyOPYQZyU2AXs2VouFSX1", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0R8kxONgyOPYQZyU2AXs2VouFSX1]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0Rj621FKEVVs4Q_EHZ-IXM9YSt7W", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0Rj621FKEVVs4Q_EHZ-IXM9YSt7W]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0aHtixv-koZmyVRcErXPe6KHQ2Om", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0aHtixv-koZmyVRcErXPe6KHQ2Om]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG0inyJ4hBzCXTmu-4xBnlo8ZMUVIT", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG0inyJ4hBzCXTmu-4xBnlo8ZMUVIT]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG10cEQEcPjbVZyA_oUIMRn9WPzzXe", :status 409, :error {:type "version_conflict_engine_exception", :reason "[4jzG10cEQEcPjbVZyA_oUIMRn9WPzzXe]: version conflict, document already exists (current version [1])", :index_uuid "nq3iCwP_QCOWuCBTdWS6zA", :shard "0", :index "logs-test"}}} {:create {:_index "logs-test", :_id "4jzG1JEzyHKYzd0tFpbe3q6H8zGZLEfX", :_version 1, :result "created", :_shards {:total 2, :successful 2, :failed 0}, :_seq_no 149, :_primary_term 1, :status 201}})}    :at [com.brunobonacci.mulog.publishers.elasticsearch$post_records invokeStatic "elasticsearch.clj" 126]}]  :trace  [[com.brunobonacci.mulog.publishers.elasticsearch$post_records invokeStatic "elasticsearch.clj" 126]   [com.brunobonacci.mulog.publishers.elasticsearch$post_records invoke "elasticsearch.clj" 107]   [com.brunobonacci.mulog.publishers.elasticsearch.ElasticsearchPublisher publish "elasticsearch.clj" 218]   [com.brunobonacci.mulog.core$start_publisher_BANG_$publish_attempt__2755 invoke "core.clj" 194]   [clojure.core$binding_conveyor_fn$fn__5823 invoke "core.clj" 2050]   [clojure.lang.AFn applyToHelper "AFn.java" 154]   [clojure.lang.RestFn applyTo "RestFn.java" 132]   [clojure.lang.Agent$Action doRun "Agent.java" 114]   [clojure.lang.Agent$Action run "Agent.java" 163]   [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1136]   [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 635]   [java.lang.Thread run "Thread.java" 833]]}, :mulog/origin :mulog/core, :mulog/trace-id #mulog/flake "4jzG1arG65y-1p9rPckU9BCjA2eW0oyg", :mulog/event-name :mulog/publisher-error}```               
BrunoBonacci commented 2 years ago

Hi @handerpeder,

The underlying problem is that you are sending to Elasticsearch the same field name but with two different data types. This is essentially an Elasticsearch limitation which I can't do anything about it.

To limit the issue I perform Field name mangling so that for most of the common case you don't have indexing errors. However in the case, from Elasticsearch point of view, you are sending arrays of different types. Elasticsearch will only accept the records matching the index mapping, whether you explicitly defined it, or implicitly deduced by Elasticsearch during the indexing of the first record of this type.

I would recommend using two different field names, you can do this by changing the entries in the code or by adding a Custom Transformation function for the Elasticsearch publisher.

Expected: 1 publisher error for the bad record

Every time the publisher attempts to post the records and gets a failure it generates a publisher-error event. Eventually the record will be "pushed-out" of the circular buffer as described in mulog internals and the publisher will recover the normal operation. Unfortunately, during this time you will receive an error every few seconds (probably in mulog-0.10.0 I will improve this)

Actual: a cascade of errors which eventually brings mulog down.

mulog publishers are self-healing and, in cases like this, they will eventually recover. It might take a long time, this depends on the size of the publisher's buffer used and the rate of events. I'm not aware of any way in which mulog can be brought down, if you are experiencing crashes caused by mulog please do let me know.

I'm not sure I can do anything here, please do let me know if there is anything else or I misunderstood the problem you are facing

handerpeder commented 2 years ago

mulog publishers are self-healing and, in cases like this, they will eventually recover.

I think this is not happening. After experiencing an issue with a bad record ( I used an index field type error as an example ), mulog seems to eventually be brought down.

My ingest count looks like this: image

And if we filter just by publisher errors image

So there are a lot of errors, then mulog stops publishing.

I've attached an example error. Seems mulog wrongly marks successful records as not being sent and attempts to retransmit them. I think the steps I've marked out above allows you to reproduce this.

error.txt

I'm also on the mulog clojurians channel if you prefer to chat there.

BrunoBonacci commented 2 years ago

At your level of event rate (~500 events/h) it will take ~40h to drop the bad record and recover. With a smaller buffer you should see it recovering sooner.

Thanks for the error file, it is super useful.

the error returned by Elasticsearch is:

{:create
  {:_index ".ds-logs-planner-2022.06.25-000002",
   :_id "4jy8FRdiVe_NFDZheSwG79BOKwnBzf2c",
   :status 409,
   :error {:type "version_conflict_engine_exception",
   :reason "[4jy8FRdiVe_NFDZheSwG79BOKwnBzf2c]: version conflict, document already exists (current version [1])",
   :index_uuid "n-QjNG-GSdqJAWR3u1ET2Q",
   :shard "0",
   :index ".ds-logs-planner-2022.06.25-000002"}}}

From this I can see that you are using Data Streams. The strange (and unexpected) this is the first line :create. This should correspond to the action sent by the client, and :_id is the :mulog/trace-id. We use this to ensure idempotency. Now, in order to ensure that in case of failure during the publishing of the events, no duplicate events are produced, mulog sends the :_id along with all the records so that a re-transmission will just update the same record.

The action sent by the publisher is index, now I'm surprised that Elasticsearch thinks we have sent create as create is not idempotent. I will investigate this further.

BrunoBonacci commented 2 years ago

Looking at the source code, the publisher sends :create

https://github.com/brunobonacci/mulog/blob/master/mulog-elasticsearch/src/com/brunobonacci/mulog/publishers/elasticsearch.clj#L5

BrunoBonacci commented 2 years ago

Looking at the code and the Elasticsearch documentation it should be possible to just use the index action as well.

Could you please update your publisher configuration as follow:

{:type          :elasticsearch
 :index-pattern "'my-stream'"  ;; SINGLE QUOTES ARE IMPORTANT
 :url           "https://my.elk"
 :http-opts     opts}

The idea is to trick the publisher into use the index action rather than the create which is not idempotent.

handerpeder commented 2 years ago

Emitted the following events

(doseq [_ (range 10)]
  (u/log :my/event :val [1 2 3]))

(doseq [_ (range 5)]
  (u/log :my/event :val ["some" "text"]))

(doseq [_ (range 50)]
  (u/log :my/event :val [1 2 3]))

Then sampled a document after 3 minutes.

Using :data-stream stream_error.txt

Using :index-pattern index_error.txt

image

First using :data-stream, then restarted repl and used :index-pattern. I get the same amount of error records using both; ~12records/m containing errors. This seems consistent with your explanation of the intended behavior.

Also as you predicted, no create errors when using :index-pattern.

BrunoBonacci commented 2 years ago

That's fantastic, I'll make a fix and release it. thanks for trying things out

BrunoBonacci commented 2 years ago

Thank you for your help.

The fix will be released with mulog-elasticsearch 0.9.0 shortly