cloudevents / sdk-python

Python SDK for CloudEvents
https://pypi.org/p/cloudevents/
Apache License 2.0
279 stars 55 forks source link

cloudevents.kafka.conversion.from_structured incorrectly handles headers tuple #210

Closed bipthelin closed 1 year ago

bipthelin commented 1 year ago

Expected Behavior

from_structured/1 on a KafkaMessage i.e. ConsumerRecord in aiokafka correctly deserialize the event into a CloudEvent.

Actual Behavior

Crash:

File "/usr/local/lib/python3.11/site-packages/cloudevents/kafka/conversion.py", line 257, in from_structured
 for header, val in message.headers.items():
                    │       └ ()
                    └ ConsumerRecord(topic='topic', partition=0, offset=4, key=None, value=b'{"s..."} headers=())

AttributeError: 'tuple' object has no attribute 'items'

Steps to Reproduce the Problem

consumer = AIOKafkaConsumer(...)
consumer.start()

for msg in consumer:
    event = from_structured(msg)
... # CRASH ↑

There seems to be a discrepancy between:

Specifications

bipthelin commented 1 year ago

Reading the code it seems that I was naive in thinking that from_structured would deserialize directly from an AIOKafka ConsumerRecord. But instead I need to do something like this:

kafka_msg = KafkaMessage(headers=dict(msg.headers), key=msg.key, value=msg.value)
event = from_structured(kafka_msg)

this works as expected. Closing issue.

xSAVIKx commented 1 year ago

Thx for raising and resolving the issue :smile:

Yes, the connector is generic and does not provide support for a specific library. This is intentional behavior. We may wanna add extras that provide library-specific API like we did with Pydantic.