logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Add producer config to enable producer idempotence #152

Open hellopeera opened 1 year ago

hellopeera commented 1 year ago

Release notes

Add Kafka producer configs enable_idempotence and max_in_flight_requests_per_connection to help ensure that exactly one copy of each message is written in the stream in face of producer retries

What does this PR do?

With producer idempotence, it ensures that duplicates are not introduced due to unexpected retries due to some intermittent issue e.g. network problem, etc.

This PR exposes the Kafka producer configurations: enable_idempotence and max_in_flight_requests_per_connection which is optional to Logstash users.

How to enable idempotence for Kafka producer

Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

How is this PR test locally

Runs a local Kafka cluster

Launch the test Kafka script

./kafka_test_setup.sh

Connect a consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logstash_integration_enable_idempotence_topic

Setup Logstash Kafka output & run

Install the plugin in development mode

bin/logstash-plugin install --no-verify /path/to/logstash_plugins/ogstash-integration-kafka/logstash-integration-kafka-*.gem

Run logstash

bin/logstash -e 'input { stdin { } } output { kafka { topic_id => "logstash_integration_enable_idempotence_topic" enable_idempotence => "true" acks => "all" max_in_flight_requests_per_connection => 5 } }'

Verify if Logstash producer config is according to what is configured

Observe Kafka producer config printed on Logstash console

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

Verify if Logstash is able to producing messages

From the Logstash console, enter some messages via stdin. Verify the logstash consumer console if the message is received.

Related issues