Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
234 stars 175 forks source link

Azure Event Hub Consumer Group and Partition Id issue with spark streaming #655

Open milinddhotarkar opened 2 years ago

milinddhotarkar commented 2 years ago

I have a use case where I need to consume the 1 million events per second from Event Hub using spark streaming. I have created Event Hub with 10 partitions and 10 consumer groups to read the events in parallel using 10 spark streaming jobs for 1 CG per Partitions. Here the problem is each consumer reads all the events from all the partitions which create an issue of duplicate data. Ideally It should read the events from specified partition. I think It is a bug. I am using data bricks with PySpark streaming to consume the events Please help how to resolve the issue.

Feature Requests:

Bug Report:

rachelxj-ms commented 1 year ago

I got the same issue while I use this library in Azure Data bricks. positionKey1 = { "ehName": ehName, "partitionId": 2 } positionMap = { json.dumps(positionKey1) : eventPosition1 } ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)

23/10/10 09:22:18 INFO EventHubsRDD: (TID 578) Computing EventHubs test, partition 2 sequence numbers 5 => 6 23/10/10 09:22:18 INFO EventHubsRDD: (TID 577) Beginning sequence number 66 is equal to the ending sequence number 66. Returning empty partition for EH: test on partition: 1 23/10/10 09:22:18 INFO EventHubsRDD: (TID 579) Beginning sequence number 3 is equal to the ending sequence number 3. Returning empty partition for EH: test on partition: 3 23/10/10 09:22:18 INFO CachedEventHubsReceiver: (TID 578) EventHubsCachedReceiver look up. For namespaceUri sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition {"ehName":"test","partitionId":2} consumer group $Default. requestSeqNo: 5, batchSize: 1 23/10/10 09:22:18 INFO CachedEventHubsReceiver: (TID 578) Finished receiving for namespaceUri: sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition: {"ehName":"test","partitionId":2} consumer group: $Default, batchSize: 1, elapsed time: 0 ms ......................... 23/10/10 09:22:21 INFO EventHubsRDD: (TID 581) Computing EventHubs test, partition 0 sequence numbers 64 => 65 23/10/10 09:22:21 INFO EventHubsRDD: (TID 583) Beginning sequence number 6 is equal to the ending sequence number 6. Returning empty partition for EH: test on partition: 2 23/10/10 09:22:21 INFO EventHubsRDD: (TID 584) Beginning sequence number 3 is equal to the ending sequence number 3. Returning empty partition for EH: test on partition: 3 23/10/10 09:22:21 INFO CachedEventHubsReceiver: (TID 581) EventHubsCachedReceiver look up. For namespaceUri sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition {"ehName":"test","partitionId":0} consumer group $Default. requestSeqNo: 64, batchSize: 1 23/10/10 09:22:21 INFO CachedEventHubsReceiver: (TID 581) Finished receiving for namespaceUri: sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition: {"ehName":"test","partitionId":0} consumer group: $Default, batchSize: 1, elapsed time: 0 ms 23/10/10 09:22:21 INFO CodeGenerator: Code generated in 16.534578 ms

compute didn't filter the partition based on the ehConf configuration. It still receives events from all partitions. Please help fix it. https://github.com/Azure/azure-event-hubs-spark/blob/master/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala#L101

rachelxj-ms commented 1 year ago

Hi @yamin-msft, can you please help with this ticket?