aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

AIOKafkaProducer failed to produce message with headers #958

Closed chkp-rivkak closed 5 months ago

chkp-rivkak commented 5 months ago

Describe the bug I encountered an issue when using AIOKafkaProducer from the aiokafka library to produce messages with headers.

I create an AIOKafkaProducer instance with the following configuration:

kafka_producer_manager= AIOKafkaProducer(
                bootstrap_servers=bootstrap_servers,
                security_protocol="SSL",
                ssl_context=ssl_context,
                value_serializer=lambda v: v.encode("utf-8"),
            )
await kafka_producer_manager.start()

when I produce a message without header it works, like this: await kafka_producer_manager.produce('{"test":"test"}', 'test-topic') Subsequently, when I attempt to produce a message with headers, like this: await kafka_producer_manager.produce('{"test":"test"}', 'test-topic', headers=[("test",b"12")]) I get this error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "<stdin>", line 4, in main
  File "/app/events/kafka_producer.py", line 53, in produce
    await self.producer.send(topic, message, headers)
  File "/opt/venv/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 459, in send
    partition = self._partition(topic, partition, key, value,
  File "/opt/venv/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 384, in _partition
    return self._partitioner(
  File "/opt/venv/lib/python3.10/site-packages/aiokafka/partitioner.py", line 26, in __call__
    idx = murmur2(key)
  File "/opt/venv/lib/python3.10/site-packages/aiokafka/partitioner.py", line 84, in murmur2
    h ^= data[length & ~3] & 0xFF
TypeError: unsupported operand type(s) for &: 'tuple' and 'int'

Expected behaviour I expected the AIOKafkaProducer to handle message production with headers without raising a TypeError. As it handles message production without headers

Environment (please complete the following information):

ods commented 5 months ago

Could you please provide your produce method too?

chkp-rivkak commented 5 months ago

Sure, Just an update - I found a workaround for this issue, I changed the code from : await self.producer.send_and_wait(topic, message, headers) (Full function code apears right after and wasn't changed) to:

async def produce(self, message, topic, headers=None):
        try:
            logger.info(
                f"Sending message {message} with headers {headers} to topic {topic} with producer {self.producer}")
            await self.producer.send_and_wait(topic=topic, value=message, headers=headers)
            logger.info(
                f"Successfully produced message to topic '{topic}': {message}")
        except Exception as e:
            logger.error(f"Failed to produce message: {e}")
            raise

Specifying parameters names when calling send_and_wait solved the issue for me, but it is still worth a check. Thanks!

ods commented 5 months ago

So, you were just passing headers into partition argument?