flier / tokio-kafka

Asynchronous Rust client for Apache Kafka
Apache License 2.0
31 stars 1 forks source link

Compression: snappy compression #12

Closed polachok closed 6 years ago

polachok commented 6 years ago

I started working on (snappy) compression implementation and it is certainly puzzling. I'm looking for some advice (documentation on the protocol is scarce).

As far as I understand compression works like this for producing: you create a MessageSet with a single message containing your actual MessageSet compressed and having compression flag set to whatever method used. So in snappy's case this is done with snap::Encoder::new().compress() (quoting docs: Thie encoder does not use the Snappy frame format and simply compresses the given bytes in one big Snappy block). Compressed message offsets are set to a sequence of numbers 0, 1, ...

For consuming you receive a Message with compressed flag set, and the MessageSet is in the value field. Here's where it is getting weird! The value starts with 0x82SNAPPY00 and, in fact, uses "java" framing. And it contains a message with offset to some certainly not-around-zero number!

Does this mean that kafka is actually decompressing and recompressing every message? What's going on?! This was my first question.

How to go about integrating compressed messages into parsing would be my second one. Should I decompress at parse time (parse_messageset() function), or later? Any thoughts would be appreciated.

polachok commented 6 years ago

@flier Any thoughts on this?

flier commented 6 years ago

In fact, kafka 0.8/0.9 use some private (incorrect) implementation for snappy/LZ4, you could check KAFKA-1718 and KAFKA-1493 for more details

polachok commented 6 years ago

Also relevant: https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets

polachok commented 6 years ago

Snappy compression works with this PR for both consuming and producing

flier commented 6 years ago

thanks