logstash-plugins / logstash-input-kinesis

Logstash Plugin for AWS Kinesis Input
Apache License 2.0
45 stars 57 forks source link

Seeing duplicate data #43

Closed jeeyoungk closed 4 months ago

jeeyoungk commented 6 years ago

Hi,

I'm seeing some duplicate data for a logstash cluster that is deployed with a setting like this:

kinesis {
    id => "input_kinesis"
    kinesis_stream_name => "logging"
    application_name => "logstash"
    codec => json {
    }
    checkpoint_interval_seconds => 10
  }

for elasticsearch output we use document_id to have UPSERTs (i'm using the mixture of kinesis partition key + sequence id, which should be unique), but with the s3 output we're actually writing significant amount of duplicate data. Is there some setting / monitoring that i'm missing?

nerophon commented 6 years ago

I believe (from experience) that this plugin puts a unique id in the @metadata._id field, but perhaps a developer on this project would care to comment. From looking at the source, it seems like this _id isn't specified. My believe, therefore, is that it is auto-generated at point of ingest by running a consistent hash against the entire metadata structure. If so, this would explain how it is filtering duplicates.

robbavey commented 6 years ago

@jeeyoungk

Are you seeing errors/warnings in your logs - it would be good to know if there are an unusual amount of errors leading to more duplicates than usual? And, what is 'a significant amount'? One thing you may want to do is reduce the value of 'checkpoint_interval_seconds' to reduce the number of documents that will be re-processed when consumer failures/retries occur.

Amazon Kinesis has 'at-least-once' semantics, so a certain amount of duplicate messages is difficult to avoid, without the idempotent processing that you are getting in the elasticsearch output via the document_id field, and which the s3-output does not currently support

erickertzvfc commented 1 year ago

@jeeyoungk i know this is old, but how are you getting the kinesis partition key + sequence id to use in your output? thanks.