logstash-plugins / logstash-integration-kafka

Kafka Integration for Logstash, providing Input and Output Plugins
Apache License 2.0
32 stars 60 forks source link

Refactor: leverage codec when using schema registry #106

Closed kares closed 2 years ago

kares commented 2 years ago

The motivation here is to be able to leverage the proper codec in all scenarios the plugin operates.

The default behavior for the Kafka input is to use the plain codec and decode the record payload. With the schema_registry_url the codec is still used but the payload is than parsed again as json.

The PR changes the behavior with some clever hooks into Plugin#initialize to change the default codec:

NOTE: PR's changes cause a slight regression, previously the plugin worked fine using schema_registry_url when a codec (e.g. codec => plain) was also specified - since json parsing was done manually regardless of the codec.


The changes are sufficient given that ecs_compatibility is either local or a global setting -> JSON codec default in case of the schema registry will already warn users on a missing target e.g.

[2021-12-06T12:55:15,785][INFO ][org.reflections.Reflections] Reflections took 116 ms to scan 1 urls, producing 119 keys and 416 values 
[2021-12-06T12:55:16,222][INFO ][logstash.codecs.json     ] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2021-12-06T12:55:16,646][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2021-12-06T12:55:16,786][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x7a8ed04a run>"}