elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.18k stars 3.49k forks source link

From Discuss: No events in DLQ #7820

Open robbavey opened 7 years ago

robbavey commented 7 years ago

from https://discuss.elastic.co/t/no-events-in-dlq/94309

User is trying to ingest data using the elasticsearch output plugin, and is seeing the following error from Elasticsearch:

MapperParsingException: object mapping for [mapping_request.payload] tried to parse field [payload] as object, but found a concrete value

but those entries are not being written to the DLQ as expected.

Some messages have been written to the DLQ, but the user is seeing the following errors

[2017-07-25T16:41:19,389][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main">
AndreAga commented 7 years ago

Hi guys, I changed my Logstash configuration as follow:

Yesterday, the first Logstash was able to put some events in the DLQ, but the second Logstash was unable to handle all dead events. The full error is:

[2017-07-25T16:41:19,389][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:20,390][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:21,608][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:22,736][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:24,009][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:25,012][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-25T16:41:47,544][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: "/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=>"json_73337b45-c626-4cae-aab9-b2f4d8775a1e", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: invalid checksum of record

(I think this error occurred only for some dead events, not for all, but honestly I don't know)

Today, I deleted the DLQ sincedb file of the second Logstash, I restarted the process and got all dead events processed without the previous error, but the first Logstash process doesn't log dead events anymore. (I don't think the two events are related) All dead files are untouched since yesterday, but I got some logs from Elasticsearch with this error (Today):

MapperParsingException: object mapping for [mapping_request.payload] tried to parse field [payload] as object, but found a concrete value

This error is the reason why I have to use DLQ. But no events were put in the DLQ.

Any ideas? Thanks.

robbavey commented 7 years ago

@AndreAga Thanks for the information - also, would it be possible to get from you:

Thanks!

AndreAga commented 7 years ago

OS Version: NAME="Red Hat Enterprise Linux Server" VERSION="7.3 (Maipo)" ID="rhel" ID_LIKE="fedora" VERSION_ID="7.3" PRETTY_NAME="Red Hat Enterprise Linux Server 7.3 (Maipo)"

Config File: Logstash Service:

input {
  kafka {
    bootstrap_servers => "...:9092,...:9092"
    topics_pattern => "logstash-55*"
    consumer_threads => 5
    auto_commit_interval_ms => "1000"
    auto_offset_reset => "latest"
    enable_auto_commit => true
    max_partition_fetch_bytes => "104857600"
    decorate_events => true
    codec => "json"
  }
}

filter {
 ###############################
 ruby {
   code => 'event.set("[fields][log_version]", event.get("[fields][log_version]")+1)'
 }
 ############################################

 if [fields][log_version] == 1 {

  # PROCESS THE EVENT
  # It's the first time that Logstash processes the event, so it applies the correct Pattern
  # Here some logs are converted in json via json {} plugin

 }  else if [fields][log_version] > 5 {

        # REMOVE ALL STRUCTURED FIELDS
        # Elasticsearch is unable to index the structured event, so set log_version=0 and remove all extra fields => Full-Index Search!
        ruby {
                code => 'event.set("[fields][log_version]", 0)'
        }
        prune {
                whitelist_names => [ "^message$", "beat", "fields", "kafka" , "@timestamp" ]
        }

 } else {
        # DO NOTHING
        # Logstash tries to index the event in a new Elasticsearch index with a version++;
 }
}

output {

   if [fields][scope] == "internal" {
        elasticsearch {
            hosts => ["...:9200", "...:9200", "...:9200", "...:9200"]
            index => "cluster-%{[fields][source_type]}-%{[fields][log_type]}-%{[fields][log_version]}-%{+YYYY.MM.dd}"
        }
   } else if "" in [fields][acronimo] {
        elasticsearch {
            hosts => ["...:9200", "...:9200", "...:9200", "...:9200"]
            index => "pattern-%{[fields][acronimo]}-%{[fields][environment]}-%{[fields][source_type]}-%{[fields][log_version]}-%{+YYYY.MM.dd}"
        }

   } else {

        elasticsearch {
            hosts => ["...:9200", "...:9200", "....:9200", "...:9200"]
            index => "pattern-%{+YYYY.MM.dd}"
        }
   }
}

Logstash DLQ:

input {
   dead_letter_queue {
    path => "/data/logstashData/dead_letter_queue"
    sincedb_path => "/data/logstashDLQData/dlq_offset"
    commit_offsets => true
    codec => "json"
  }
}

filter {}

output {
  kafka {
    bootstrap_servers => "...:9092,...:9092"
    topic_id => "logstash-55"
    batch_size => 1
    codec => "json"
  }
}

A little update: I think that I found the problem why the first Logstash didn't put che events in the DLQ. For some strange reasons in the DLQ directory the file .lock didn't exist. So I stop all Logstash instances, I removed all files in the DLQ dir and restart the first Logstash instances, it put only the first 1.log (1 byte) file and the .lock file. Now the DLQ is contantly updated. 👍

I restarted also the second instance of Logstash with DLQ input and it processed some of the dead events, but it started logging this error (the same as above):

[2017-07-27T09:46:20,122][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_843231a1-0cd5-4151-8dae-1f6f75762381", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-27T09:46:21,122][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_843231a1-0cd5-4151-8dae-1f6f75762381", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-27T09:46:22,123][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_843231a1-0cd5-4151-8dae-1f6f75762381", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-27T09:46:23,124][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_843231a1-0cd5-4151-8dae-1f6f75762381", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error: [2017-07-27T09:46:24,125][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", sincedb_path=>"/data/logstashDLQData/dlq_offset", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_843231a1-0cd5-4151-8dae-1f6f75762381", enable_metric=>true, charset=>"UTF-8">, id=>"52ff26acd4e4d46066179db0a3cdf329cbe3c13c-1", enable_metric=>true, pipeline_id=>"main"> Error:

Every second there is the same log because there is only 'Error:' without any other description. Instead, as I reported in my previous comment, this kind of error stops only when there is 'Error: invalid checksum of record'

So, I think the problem is with the DLQ input. In fact, the first Logstash continues to put some events in the DLQ but now the second Logstash continues to log that error without processing other events..

If you want I can share my DLQ, but in a private and confidential way.

AndreAga commented 7 years ago

I tried to restart the second Logstash with DLQ input and now I got this error:

[2017-07-27T10:32:10,456][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_a9cea612-b5db-4b18-9b26-18882f279e57", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@8b5d9d; line: -1, column: 35969]
[2017-07-27T10:32:11,472][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_a9cea612-b5db-4b18-9b26-18882f279e57", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@3b1496c7; line: -1, column: 34402]
[2017-07-27T10:32:12,477][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_a9cea612-b5db-4b18-9b26-18882f279e57", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@b9d8142d; line: -1, column: 34402]
[2017-07-27T10:32:13,494][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_a9cea612-b5db-4b18-9b26-18882f279e57", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@93632797; line: -1, column: 45379]

This error happens for all the old and new events in the queue, until this error occurs:

[2017-07-27T11:04:05,984][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_f8d70cc8-9441-4ca2-a6ee-f5dac7ef3f3b", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error:
[2017-07-27T11:04:06,985][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_f8d70cc8-9441-4ca2-a6ee-f5dac7ef3f3b", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error:
AndreAga commented 7 years ago

Moreover, for some reason, when I reload the pipeline of the first (service) Logstash, it removes .lock file from DLQ path (I got some automatic pipeline reloads because of typos), and then no other dead events are put into the DLQ. So I had to restart the service to get it worked again (with a new .lock)

After some tests, I can confirm that Logstash, when reloads pipelines, removes '.lock' file. and with a service logstash restart the file is restored.

robbavey commented 7 years ago

Hi @AndreAga,

Thanks a lot for answering me so throughly! This will help immensely in tracking down your problem.

AndreAga commented 7 years ago

@robbavey Perfect, I hope you can find also the problem about DLQ input :)

After a full day of work, I can say that some events were correctly processed by the DLQ input without errors, but for all the others the errors were the same as above, nothing else.

robbavey commented 7 years ago

Hi @AndreAga

I suspect your issue with the DLQ input is related to the message is being re-routed straight back to the DLQ via the elasticsearch output plugin, and being processed again, etc - I'm guessing that your filter setup is trying to implement some form of dlq_depth counter to circumvent this, where once your message has gone through the DLQ > 5 times, you will strip all the content from the message other than the whitelisted items?

Did you have this filter originally, or since you started seeing issues?

One other data point that would be useful - could you send a me the output from running ls -l in the dead_letter_queue data directory?

Thanks,

Rob

AndreAga commented 7 years ago

This is the ls -l

total 244020
-rw-r--r-- 1 logstash logstash  9670958 Jul 27 08:58 10.log
-rw-r--r-- 1 logstash logstash  8339747 Jul 27 09:03 11.log
-rw-r--r-- 1 logstash logstash  9023552 Jul 27 09:04 12.log
-rw-r--r-- 1 logstash logstash 10438745 Jul 27 09:24 13.log
-rw-r--r-- 1 logstash logstash 10280219 Jul 27 09:48 14.log
-rw-r--r-- 1 logstash logstash 10428610 Jul 27 09:55 15.log
-rw-r--r-- 1 logstash logstash 10461171 Jul 27 10:29 16.log
-rw-r--r-- 1 logstash logstash 10465940 Jul 27 10:51 17.log
-rw-r--r-- 1 logstash logstash  1975262 Jul 27 10:52 18.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:53 19.log
-rw-r--r-- 1 logstash logstash  8464481 Jul 27 07:54 1.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:53 20.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:53 21.log
-rw-r--r-- 1 logstash logstash   121958 Jul 27 10:53 22.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:54 23.log
-rw-r--r-- 1 logstash logstash   121953 Jul 27 10:54 24.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:54 25.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:54 26.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:55 27.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:55 28.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:55 29.log
-rw-r--r-- 1 logstash logstash  9294090 Jul 27 07:58 2.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:55 30.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:56 31.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:56 32.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:56 33.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:56 34.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:57 35.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:57 36.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:57 37.log
-rw-r--r-- 1 logstash logstash   121961 Jul 27 10:57 38.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:58 39.log
-rw-r--r-- 1 logstash logstash  9761825 Jul 27 08:03 3.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:58 40.log
-rw-r--r-- 1 logstash logstash   121961 Jul 27 10:58 41.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:58 42.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:59 43.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:59 44.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 10:59 45.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 11:00 46.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 11:00 47.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 11:00 48.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 11:00 49.log
-rw-r--r-- 1 logstash logstash  9325543 Jul 27 08:09 4.log
-rw-r--r-- 1 logstash logstash        1 Jul 27 11:01 50.log
-rw-r--r-- 1 logstash logstash 10124175 Jul 27 11:15 51.log
-rw-r--r-- 1 logstash logstash  2966299 Jul 27 11:23 52.log
-rw-r--r-- 1 logstash logstash  9686005 Jul 27 13:29 53.log
-rw-r--r-- 1 logstash logstash  8010969 Jul 27 14:33 54.log
-rw-r--r-- 1 logstash logstash 31486673 Jul 27 14:33 55.log
-rw-r--r-- 1 logstash logstash 10419608 Jul 27 14:57 56.log
-rw-r--r-- 1 logstash logstash  9876745 Jul 28 06:04 57.log
-rw-r--r-- 1 logstash logstash  9522111 Jul 27 08:19 5.log
-rw-r--r-- 1 logstash logstash 10003090 Jul 27 08:23 6.log
-rw-r--r-- 1 logstash logstash 10032501 Jul 27 08:42 7.log
-rw-r--r-- 1 logstash logstash  9334404 Jul 27 08:45 8.log
-rw-r--r-- 1 logstash logstash  9821203 Jul 27 08:55 9.log

Some files are empty because of Logstash pipeline restart.

Just to be clear, I changed the filter section of my main pipeline in this way:

filter { 
###############################
 ruby {
   code => 'event.set("[fields][log_version]", event.get("[fields][log_version]")+1)'
 }
 ###############################

 if [fields][log_version] == 1 {

  # PROCESS THE EVENT
  # It's the first time that Logstash processes the event, so it applies the correct Pattern

} else {

        # REMOVE ALL STRUCTURED FIELDS
        # Elasticsearch is unable to index the structured event, so set log_version=0 and remove all extra fields => Full-Index Search!

        ruby {
                code => 'event.set("[fields][log_version]", 0)'
        }

        prune {
                whitelist_names => [ "beat", "fields", "kafka", "@timestamp", "^source$", "^type$", "^input_type$", "^message$"]
        }
 }

Doing this, now, all the dead events are processed only once by the DLQ input and when they come back to the first Logstash via Kafka they are indexed without all extra json fields.

The problem still persist, but not for all the dead events, some are processed without errors, so is it possible that these errors are related to the size of the dead events? Maybe when the size of the event is too high, the DLQ input plugin starts to process it before the first Logstash has finished to write it into the dead queue and the DLQ input throws the error? Just saying..

Thanks

AndreAga commented 7 years ago

Just to answer to your second question. I always got this error with DLQ because I installed ELK 5.5 for the DLQ feature :) , and I implemented this behavior to automatic handle all problematic json.

AndreAga commented 7 years ago

Other info here.

Today I left the two Logstash instances active. I can say that all events with mapping conflicts have been logged in the DLQ and after about 1200 events the DLQ input went in loop with this error (27.000 times the same error):

ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin. Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_f8d70cc8-9441-4ca2-a6ee-f5dac7ef3f3b", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main"> Error:

So, tonight I restarted the second Logstash and it processed all the dead events with these errors:

[2017-07-28T22:00:50,234][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_889fdfbf-1e3b-47e7-90d1-33c516d9acd1", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@a4ebf8f1; line: -1, column: 21914]
[2017-07-28T22:00:51,237][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_889fdfbf-1e3b-47e7-90d1-33c516d9acd1", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@5ea10fe7; line: -1, column: 21981]
[2017-07-28T22:00:52,239][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_889fdfbf-1e3b-47e7-90d1-33c516d9acd1", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@9098f4f3; line: -1, column: 21981]
[2017-07-28T22:01:37,560][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_889fdfbf-1e3b-47e7-90d1-33c516d9acd1", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: invalid checksum of record
[2017-07-28T22:04:02,065][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_889fdfbf-1e3b-47e7-90d1-33c516d9acd1", enable_metric=>true, charset=>"UTF-8">, id=>"849c7d0ea9bc554ae3c8ba38b6bc03cdd41b4539-1", enable_metric=>true, pipeline_id=>"main">
  Error: invalid checksum of record

BUT there wasn't any loop. Honestly, I can't say if the events that caused the errors were indexed or not.

robbavey commented 7 years ago

@AndreAga Thank you for the further info - I'm still investigating and working on mitigations and fixes for these issues

AndreAga commented 7 years ago

I tried to increase the log verbosity to DEBUG.

Stack Trace of Invalid CBOR value token (first byte): 0x1f

[2017-07-29T13:41:29,016][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_498c640c-b168-4728-b4aa-cd5cbc10b8cc", enable_metric=>true, charset=>"UTF-8">, id=>"bdf528b98cb1fca62d7c5dced34650a73caa5068-1", enable_metric=>true, pipeline_id=>"main">
  Error: Invalid CBOR value token (first byte): 0x1f
 at [Source: [B@a14d00da; line: -1, column: 35969]
  Exception: Java::ComFasterxmlJacksonCore::JsonParseException
  Stack: com.fasterxml.jackson.core.JsonParser._constructError(com/fasterxml/jackson/core/JsonParser.java:1586)
com.fasterxml.jackson.dataformat.cbor.CBORParser._invalidToken(com/fasterxml/jackson/dataformat/cbor/CBORParser.java:847)
com.fasterxml.jackson.dataformat.cbor.CBORParser.nextToken(com/fasterxml/jackson/dataformat/cbor/CBORParser.java:683)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapArray(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:588)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:510)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:653)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:496)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapArray(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:587)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:510)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:638)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:496)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:629)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:496)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:653)
com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(com/fasterxml/jackson/databind/deser/std/UntypedObjectDeserializer.java:496)
com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringMap(com/fasterxml/jackson/databind/deser/std/MapDeserializer.java:495)
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(com/fasterxml/jackson/databind/deser/std/MapDeserializer.java:341)
com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(com/fasterxml/jackson/databind/deser/std/MapDeserializer.java:26)
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(com/fasterxml/jackson/databind/ObjectMapper.java:3807)
com.fasterxml.jackson.databind.ObjectMapper.readValue(com/fasterxml/jackson/databind/ObjectMapper.java:2890)
org.logstash.Event.fromBinaryToMap(org/logstash/Event.java:219)
org.logstash.Event.deserialize(org/logstash/Event.java:394)
org.logstash.DLQEntry.deserialize(org/logstash/DLQEntry.java:76)
org.logstash.common.io.DeadLetterQueueReader.pollEntry(org/logstash/common/io/DeadLetterQueueReader.java:99)
org.logstash.input.DeadLetterQueueInputPlugin.run(org/logstash/input/DeadLetterQueueInputPlugin.java:82)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:508)
RUBY.run(/root/logstashDLQ/vendor/bundle/jruby/1.9/gems/logstash-input-dead_letter_queue-1.0.5/lib/logstash/inputs/dead_letter_queue.rb:60)
RUBY.inputworker(/root/logstashDLQ/logstash-core/lib/logstash/pipeline.rb:456)
RUBY.start_input(/root/logstashDLQ/logstash-core/lib/logstash/pipeline.rb:449)
java.lang.Thread.run(java/lang/Thread.java:785)

I indexed in elastic all debug logs of Logstash DLQ input and I noticed that: 1) For each CORRECT and PARSED dead event there are at least 2 logs:

Here, only the last 10.000 chars are printed in the log. Is this intentional or not? Because the error seems to be related to a malformed JSON string.

Stack Trace of Error:

[2017-07-29T14:20:16,031][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::DeadLetterQueue path=>"/data/logstashData/dead_letter_queue", commit_offsets=>true, codec=><LogStash::Codecs::JSON id=>"json_ac517034-ee46-455d-8ec0-45415d1b92c0", enable_metric=>true, charset=>"UTF-8">, id=>"bdf528b98cb1fca62d7c5dced34650a73caa5068-1", enable_metric=>true, pipeline_id=>"main">
  Error: 
  Exception: Java::JavaLang::IllegalArgumentException
  Stack: java.nio.Buffer.position(java/nio/Buffer.java:255)
org.logstash.common.io.RecordIOReader.seekToStartOfEventInBlock(org/logstash/common/io/RecordIOReader.java:157)
org.logstash.common.io.RecordIOReader.consumeToStartOfEvent(org/logstash/common/io/RecordIOReader.java:172)
org.logstash.common.io.RecordIOReader.readEvent(org/logstash/common/io/RecordIOReader.java:206)
org.logstash.common.io.DeadLetterQueueReader.pollEntryBytes(org/logstash/common/io/DeadLetterQueueReader.java:118)
org.logstash.common.io.DeadLetterQueueReader.pollEntry(org/logstash/common/io/DeadLetterQueueReader.java:95)
org.logstash.input.DeadLetterQueueInputPlugin.run(org/logstash/input/DeadLetterQueueInputPlugin.java:82)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:508)
LogStash::Inputs::DeadLetterQueue.run(/root/logstashDLQ/vendor/bundle/jruby/1.9/gems/logstash-input-dead_letter_queue-1.0.5/lib/logstash/inputs/dead_letter_queue.rb:60)
LogStash::Inputs::DeadLetterQueue.run(/root/logstashDLQ/vendor/bundle/jruby/1.9/gems/logstash-input-dead_letter_queue-1.0.5/lib/logstash/inputs/dead_letter_queue.rb:60)
RUBY.inputworker(/root/logstashDLQ/logstash-core/lib/logstash/pipeline.rb:456)
RUBY.start_input(/root/logstashDLQ/logstash-core/lib/logstash/pipeline.rb:449)
java.lang.Thread.run(java/lang/Thread.java:785)
robbavey commented 7 years ago

Hi @AndreAga

Thank you again for digging in and helping us with this issue. The 10,000 character limit is from our logging - messages are trimmed to only show the first 10,000 characters. Is it expected that your messages are greater than 10k each? What is the range of message sizes that you trying to ingest, and that are being added to the DLQ?

Thanks,

Rob

AndreAga commented 7 years ago

Hi @robbavey I'm ingesting about 50.000.000 events per day and about 1.000.000 of that are very very long JSON that represent the user request and the system response (in the same event). The range of these specific JSON events is between 200/250 min chars to thousands chars (some JSON fields contain the Base64 of PDF or the STREAM of Zips, etc). Please don't ask me why :). My Filebeat configs file have the multiline limit = 1.000.000 lines.

robbavey commented 7 years ago

That makes sense...

Couple more questions for you...

Thanks

AndreAga commented 7 years ago

I looked at in:

The max size of Kafka message is set to 50MB. Previously it was 10MB, but the verbosity has been increased. Roughly I think 15-20MB could be the limit.

robbavey commented 7 years ago

Thanks!

AndreAga commented 7 years ago

Hi @robbavey any update? When will these fixes be officially released? Thanks.

robbavey commented 7 years ago

Hi @AndreAga The fixes should be included in the next version of the Elastic stack, in 5.5.3 and 5.6, which are scheduled to be released very shortly. I will add a comment to this thread when they are released.

And thank you again for the bug report, and your patience.

AndreAga commented 7 years ago

@robbavey I just installed Logstash 5.6 and tested the first bug (#7828). When the pipeline restarts, the .lock file is removed. OMG! Must I downgrade to 5.5.3?

EDIT: Also with the 5.5.3 the .lock file is removed on pipeline restart. Instead, after a full service restart the .lock comes back.

robbavey commented 7 years ago

@AndreAga No, 5.6 should work correctly - double check your logstash config to make sure that the dead letter queue is still enabled, and pointing to the same location