vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.15k stars 1.6k forks source link

Kafka Sink: Support encoding Confluent Kafka wire format #19872

Open silverwind opened 9 months ago

silverwind commented 9 months ago

A note for the community

Use Cases

Confluent Kafka has a proprietary Kafka Wire format that expects a 5-byte header that Vector's encoder currently can not produce.

Vector already recognizes and strips these 5 bytes during Avro decoding as part of decoding.avro.strip_schema_id_prefix, though the header is actually not related to Avro encoding at all and in fact can also appear for Protobuf and JSON too on Confluent Kafka.

So I suggest that the header encoding to be implemented in the Kafka Sink and decoding to be moved to the Kafka Source.

https://github.com/vectordotdev/vector/blob/e8401c473fb0334c36ac91a411392f1ac7ae9ce5/lib/codecs/src/decoding/format/avro.rs#L139-L149

Proposal

Add two options to the kafka sink' encoding to encode this header:

encoding:
  confluent_magic_byte: 0
  confluent_schema_id: 1

References

https://github.com/vectordotdev/vector/issues/561 https://github.com/vectordotdev/vector/issues/19546

Version

vector 0.35.0 (x86_64-unknown-linux-gnu e57c0c0 2024-01-08 14:42:10.103908779)

jpds commented 1 month ago

Potential Rust crate to implement this with: https://github.com/gklijs/schema_registry_converter