Closed cgiraldo closed 7 years ago
I have implemented a basic encoding function to integrate logstash pipelines in our confluent-kafka platform.
It prefixes the magic number (\x00) and the schema_id. The schema_id is provided in the logstash codec configuration.
The output is a ByteArray, so the logstash-output-kafka should be configured with:
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
I have tested it with a kafka-connect consumer and It is able to decode the messages produced by my logstash pipeline.
I hope it helps!
I have implemented a basic encoding function to integrate logstash pipelines in our confluent-kafka platform.
It prefixes the magic number (\x00) and the schema_id. The schema_id is provided in the logstash codec configuration.
The output is a ByteArray, so the logstash-output-kafka should be configured with:
value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer'
I have tested it with a kafka-connect consumer and It is able to decode the messages produced by my logstash pipeline.
I hope it helps!