aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

UnsupportedCodecError for snappy when producing events without snappy compression #922

Closed whwright closed 10 months ago

whwright commented 10 months ago

Describe the bug When trying to run a very basic producer/consumer I am getting

kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found

for events that are produced without any compression type defined.

Expected behaviour When no compression type is defined I don't need snappy to consumer events

Environment (please complete the following information):

Reproducible example

import asyncio
import json

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

BOOTSTRAP_SERVERS = "redacted"
TOPIC_NAME = "redacted"

def serializer(value):
    return json.dumps(value).encode()

def deserializer(serialized):
    return json.loads(serialized)

async def main():
    producer = AIOKafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        value_serializer=serializer,
    )

    consumer = AIOKafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        value_deserializer=deserializer,
        auto_offset_reset="earliest",
    )

    try:
        await producer.start()
        await consumer.start()

        message = {"name": "Harrison"}
        print("producing...")
        await producer.send_and_wait(TOPIC_NAME, message)

        print("consuming...")
        data = await consumer.getmany(timeout_ms=1000)
        for tp, messages in data.items():
            for message in messages:
                print(type(message.value), message.value)

    finally:
        await producer.stop()
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(main())

and the output

producing...
consuming...
Traceback (most recent call last):
  File "/Users/harrison.wright/code/projectname/reproduce.py", line 51, in <module>
    asyncio.run(main())
  File "/Users/harrison.wright/.pyenv/versions/3.10.13/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/harrison.wright/.pyenv/versions/3.10.13/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/Users/harrison.wright/code/projectname/reproduce.py", line 40, in main
    data = await consumer.getmany(timeout_ms=1000)
  File "/Users/harrison.wright/.virtualenvs/projectname-310/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 1202, in getmany
    records = await self._fetcher.fetched_records(
  File "/Users/harrison.wright/.virtualenvs/projectname-310/lib/python3.10/site-packages/aiokafka/consumer/fetcher.py", line 1063, in fetched_records
    records = res_or_error.getall(max_records)
  File "/Users/harrison.wright/.virtualenvs/projectname-310/lib/python3.10/site-packages/aiokafka/consumer/fetcher.py", line 134, in getall
    for msg in self._partition_records:
  File "/Users/harrison.wright/.virtualenvs/projectname-310/lib/python3.10/site-packages/aiokafka/consumer/fetcher.py", line 199, in __next__
    return next(self._records_iterator)
  File "/Users/harrison.wright/.virtualenvs/projectname-310/lib/python3.10/site-packages/aiokafka/consumer/fetcher.py", line 245, in _unpack_records
    for record in next_batch:
  File "aiokafka/record/_crecords/default_records.pyx", line 377, in aiokafka.record._crecords.default_records.DefaultRecordBatch.__iter__
  File "aiokafka/record/_crecords/default_records.pyx", line 234, in aiokafka.record._crecords.default_records.DefaultRecordBatch._maybe_uncompress
  File "aiokafka/record/_crecords/default_records.pyx", line 125, in aiokafka.record._crecords.default_records._assert_has_codec
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found
vmaurin commented 10 months ago

Does by any chance the topic contain existing messages, produced by an other producer ? The consumer auto reset to "earliest" so it will consume the topic from the beginning, and in Kafka, the broker sends messages as they are stored, as they want to delegate most of the CPU work to the clients. So you could face :

whwright commented 10 months ago

Does by any chance the topic contain existing messages, produced by an other producer ?

No this is not possible, I am running this example on a brand new topic where my event in the script is the first and only one published.

vmaurin commented 10 months ago

Do you have access to the segment files on the broker ? If yes, you can check the compression with a kafka tools provided in the kafka binaries. Maybe there are 3rd party tools also offer that while consuming a topic. In aiokafka, it seems the error you have should happen when reading messages with header compression saying "snappy" :

whwright commented 10 months ago

I found out that the topic I was creating was being created with compression.type: "snappy" without me providing that config to the API call. I am using a shared cluster, so I suspect that is the cluster's default in some way.

I re-creating my topic using compression.type: "producer" and my example above now works.

Thank you for your help, sorry it ended up being a dumb mistake!