logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Implement the `static membership` feature to Kafka input #135

Closed andsel closed 1 year ago

andsel commented 1 year ago

Release notes

Added config group_instance_id to use the Kafka's consumer static membership feature

What does this PR do?

Static membership is reflected in the Kafka property group.instance.id, which has to be a unique identifier of the consumer instance provided by end user. If consumer_threads settings is 1 the value is passed directly down to the Kafka's consumer configuration, but if threads count is more than 1, as per KIP-345 it would clash, so in that case, a postfix -<thread-index> is added.

Why is it important/What is the impact to the user?

The static membership feature offered by Kafka client's consumer is intended to bind a consumer to a partition, this is needed in cases where the cost of state replication between consumers during a rebalance, is high. This PR exposes the feature, which is optional, to Logstash's users.

Checklist

Author's Checklist

How to test this PR locally

Runs a local Kafka cluster

Fro the clone of this repository launch the test Kafka script

./kafka_test_setup.sh

Connect a producer

Setup Logstash Kafka input & run

output { stdout { codec => rubydebug } }


### Verify Logstash is receiving data
From the producer's console send some data and verify on the Logstash console the message is received.

### Verify another consumer kicks off Logstash
Start another consumer with same `group.instance.id` in same consumer group
- In the folder `build/kafka the file `client_config.properties` with content

group.instance.id=test_static_group_id

```bash
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "logstash_integration_static_membership_topic" --from-beginning --group logstash --consumer.config "${PWD}/client_config.properties"

Test with multiple threads

Related issues

Use cases

As a Kafka uses that want to avoid that consumers spend a lot of time during rebalances, specially when they have heavy state, I want to be able to assign a "static membership" id to every consumer.