Closed chongzhang closed 3 years ago
Thanks for reporting the bug!! We'll take a look asap
@chongzhang , thanks for reaching out.
as far as I know, the partition_key
is used by the service to decide partition, but I'm not sure whether or not the service would set the kafka message key using the partition_key
.
hey @serkantkaraca , could you help give some more context on this one?
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @jfggdl.
Author: | chongzhang |
---|---|
Assignees: | yunhaoling |
Labels: | `Client`, `Event Hubs`, `Messaging`, `Service Attention`, `customer-reported`, `question` |
Milestone: | [2021] May |
@yunhaoling
I did another test to use https://pypi.org/project/kafka-python/ 2.0.2 to produce msg with key:
producer.send('mytopic', key='key1, value=msg)
for msg in consumer: logging.info('{} {} {} {}'.format(msg.partition, msg.offset, msg.key, msg.value))
the log shows the msg.key:
0 67212 b'key1' b'{"data": "msg 1"}'
similar with kafkacat and sarama consumer, which both print out the key (also offset and value fields).
The azure-eventhub consumer doesn't have the partition_key in the event:
event { body: '{"data": "msg 1"}', properties: {}, offset: 137438954224, sequence_number: 67212, enqueued_time=datetime.datetime(2021, 5, 6, 18, 1, 51, 674000, tzinfo=datetime.timezone.utc) }
thanks for more information! yeah, this makes me further wonder whether the PartitionKey
concept in Event Hub and Key
concept in Kafka are the same thing -- apologize that I didn't have enough background to answer it.
I've looped in the service team to help answer your question.
Hi @yunhaoling, any update on this with service team?
Can you examine the messages with Service Bus Explorer and see if partition keys are present? Better to pinpoint whether the issue is on the producer side or the consumer side.
I have tried the confluent-kafka python sdk to send and receive events by following azure-event-hubs-for-kafka python sample.
I'm using the following steps to check the behavior difference between kafka sdk and python event hub sdk:
The confluent-kafka producer and consumer sample are good with message key -- I have tweaked the producer.py and consumer.py in the confluent-kafka sample to set and get message key.
# on the producer, produce message with key-value pair
p.produce(topic, key='partition key', value=str(i), callback=delivery_callback)
# on the consumer side, print out the key-value pari
print(msg.key())
print(msg.value())
No, the partition key is NOT showing up in the explorer
the received event doesn't have a partition key -- the python eventhub sdk inspect the "x-opt-partition-key" entry in the internal amqp message annotation.
however, the internal amqp message annotation contains an entry "x-opt-kafka-key" and the value is exactly the key set by the kafka producer.
@serkantkaraca , looks like "x-opt-partition-key" and "x-opt-kafka-key" are treated differently, is this a by-design difference?
Hi, is there any update on this?
hey @chongzhang , confirmed with @serkantkaraca that x-opt-kafka-key
(which is set when event is sent by kafka sdk and which is not set when event is sent by the Event Hub sdk) is not used as partition key in the Event Hubs Service, so it is an expected behavior.
hey @hmlam do you have any thoughts on this issue? is there anything we could or should do on the service/sdk side or we handover it to the kafka developer?
hey @chongzhang, I have discussed with the service team. The summary is as follows:
Please let me know if you have any other questions and really appreciate your feedbacks!
@yunhaoling Thanks for your detail info.
For producer, my understanding is that in order for different languages/libraries to send same msg to same partition, they should use custom hash partitioner function to make sure that the same msg maps to same partition id. Could you share some info on what hash function EH client uses to map the key to partition id, or how to provide custom partitioner?
For consumer, the msg Headers could have the msg key, but it's not standardized, so it would be great if EH consumer client can return msg key based on the 'x-opt-partition-key' or 'key' in the header.
Thanks again for your help!
You can print the partition key via message headers as below.
Formatting
Headers: %h
Sample output
Headers: x-opt-partition-key=�↑this-is-my-partition-key,
@chongzhang If we use the same key across multiple SDKs, messages go to the same partition as the hashing happens at the service side.
@kasun04 thanks! Just to clarify, what do you mean on "multiple SDKs"? I thought Adam @yunhaoling mentioned above that "because with the same value the Kafka client and the EH client likely send the message to two different partitions".
I was referring to using same partition keys from SDKs for different languages.
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Describe the bug After using azure-eventhub package to produce message with partitionKey, using azure-eventhub package to consume the message shows the body and partition_key in the event. But using kafkacat to consume the topic shows null/empty key for the message.
To Reproduce Steps to reproduce the behavior:
event_data_batch_with_partition_key = producer.create_batch(partition_key='key1') event_data_batch_with_partition_key.add( EventData(msg)) producer.send_batch(event_data_batch_with_partition_key)
async def on_event(partition_context, event): logging.info(f'event {event}')
the consumer log shows the event message with body, partition_key, and other fields, e.gevent { body: '{"name": "myname", "data": "msg 18"}', offset: 133144046968, sequence_number: 66430, partition_key=b'key1', enqueued_time=datetime.datetime(2021, 4, 30, 19, 41, 19, 410000, tzinfo=datetime.timezone.utc) }
kafkacat -b $BROKER -t $TOPIC -f '\n%t Key (%K bytes): %k :\nValue (%S bytes): %s\n%T \Partition: %p\tOffset: %o\n--\n' -o end
kafkacat consumes the msg with empty/null key:
mytopic Key (-1 bytes): : Value (36 bytes): {"name": "myname", "data": "msg 18"} 1619811679410 Partition: 1 Offset: 66430
Expected behavior
Screenshots
Additional context