tychedelia / kafka-protocol-rs

Rust Kafka protocol
Apache License 2.0
63 stars 23 forks source link

decompressing Snappy encoded RecordBatch fails #79

Open pdeva opened 2 months ago

pdeva commented 2 months ago

getting error:

 snappy: corrupt input (expected valid offset but got offset 20545; dst position: 0)

changing to another compression like lz4 works fine.

pdeva commented 2 months ago

seems the bug occurs when the batch is compressed with snappy 'framing' turned on.

this is the code from kafka-go client. when x.framed is set to true, it will cause the decoder in kafka-protocol-rs to crash. seems its set to true by default for all popular kafka clients including the cannonical java client.

func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
    x, _ := writerPool.Get().(*xerialWriter)
    if x != nil {
        x.Reset(w)
    } else {
        x = &xerialWriter{writer: w}
    }
    x.framed = c.Framing == Framed
    switch c.Compression {
    case FasterCompression:
        x.encode = s2.EncodeSnappy
    case BetterCompression:
        x.encode = s2.EncodeSnappyBetter
    case BestCompression:
        x.encode = s2.EncodeSnappyBest
    default:
        x.encode = snappy.Encode // aka. s2.EncodeSnappyBetter
    }
    return &writer{xerialWriter: x}
}
tychedelia commented 2 months ago

For compression options, we want to match the official Java client wherever possible, so this would certainly be a bug on our end.