IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

"proto: can't skip unknown wire type 4" error when consuming snappy compressed messages #1705

Closed anujchandraZ closed 1 year ago

anujchandraZ commented 4 years ago

Thanks for taking the time to help me out ๐Ÿ˜„

Versions
Sarama Kafka Go
1.26.1 2.4.0 1.13
Configuration

What configuration values are you using for Sarama and Kafka?

// Sarama config
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
sarama.Logger = lognew.New(os.Stdout, "[SARAMA]", os.O_TRUNC)
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "group_id", config)
if err != nil {
    panic(err)
}
client.consumerGroup = consumerGroup

// Consumer Logic
type messageHandler struct {
    callbackReceivers map[string]message.Receiver
}

func (mh *messageHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        log.Debug(fmt.Sprintf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset))

        event := &event.Event{}
        err := proto.Unmarshal(msg.Value, event)
        if err != nil {
            fmt.Println("ERROR: ", err) 
            return nil
        }
    }

    // Do actual computation
}

Kafka config

/usr/bin/kafka-topics --describe --zookeeper zookeeper:2181 --topic mock_topic

Topic: mock_topic     PartitionCount: 1       ReplicationFactor: 1  Configs:compression.type=snappy

Topic: mock_topic     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
Logs
logs: CLICK ME

``` proto: can't skip unknown wire type 4 [SARAMA]consumer/broker/1 closed dead subscription to mock_topic/0 [SARAMA]client/metadata fetching metadata for [mock_topic] from broker localhost:9092 [SARAMA]loop check partition number coroutine will exit, topics [mock_topic] [SARAMA]client/coordinator requesting coordinator for consumergroup hermes-cart-updates-group-id from localhost:9092 [SARAMA]client/coordinator coordinator for consumergroup hermes-cart-updates-group-id is #1 (localhost:9092) [SARAMA]consumer/broker/1 added subscription to mock_topic/0 ```

Problem Description

Error when running proto.Unmarshal(msg.Value, event) when using snappy

If my understanding is correct, this article states that "The consumer thread dequeues data from this blocking queue, decompresses and iterates through the messages", which means any message my consumer consumes should already be decompressed. But matching the byte array of both compressed/uncompressed msg.Value is different. What am I missing here๐Ÿ˜•?

ghost commented 3 years ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.