Azure / azure-sdk-for-go

This repository is for active development of the Azure SDK for Go. For consumers of the SDK we recommend visiting our public developer docs at:
https://docs.microsoft.com/azure/developer/go/
MIT License
1.65k stars 847 forks source link

Events are not getting distributed among multiple consumers(Only a single consumer receives events from all the partitions) #21675

Closed devak8 closed 1 year ago

devak8 commented 1 year ago

Bug Report

Hey folks , i am using this azure sdk in golang for testing the received events from eventhub according to partitions with multiple consumers running parallely, but events were not getting distributed as per partitions. I am using this azure sdk currently: github.com/Azure/azure-event-hubs-go/v3 Suppose there is running multiple consumer running parallely, so events should have get distributed to each consumers but now it is distributed to single consumer. Please take a look into this. Thanks in advace.

Happy Go.

PremSahooESL commented 1 year ago

Does Go version of Eventhub Receiver Library supports automatic load balancing when there are multiple consumers? Do multiple cusumers get lease for unique Eventhub's Partition like JAVA Eventhub SDK? So that they could receive events parallely from eventhub.

richardpark-msft commented 1 year ago

Hi @devak8,

This repository is for the github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs package. The package you're mentioning (https://github.com/Azure/azure-event-hubs-go) is deprecated.

With that said, in Event Hubs the way that it works is that consumers are associated with partitions. So if more events are being sent to a single partition then only one consumer will be able to actually read events.

Have your sending patterns changed, with events no longer being distributed evenly across partitions?

richardpark-msft commented 1 year ago

@PremSahooESL - yes, both the package in this repo (github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs) and the deprecated package use a checkpoint store to ensure load is distributed.

As I mentioned above though, we don't receive in parallel on a single partition.

github-actions[bot] commented 1 year ago

Hi @devak8. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

PremSahooESL commented 1 year ago

@richardcho-msft please consider the case below:

With 5 partitions and 5 consumers, in Go Version of SDK only one consumer keeps receving data from all 5 partitions, even though we have another 4 consumers, they are just idle. Which is wrong. In this case each consumer should receive from a dedicated partition (the way JAVA Version SDK works).

richardpark-msft commented 1 year ago

@PremSahooESL, to clarify - are you and @devak8 working together? I want to make sure we're only troubleshooting one problem at a time.

Also, I'm assuming @richardcho-msft was tagged accidentally and you meant to tag me (@richardpark-msft ), correct?

richardpark-msft commented 1 year ago

I just ran a test version of this and it all seems to work the way you'd expect - a single processor dominates for a short period of time until it balances out and I see that all processors are receiving events.

Can you give me some more details about your application? Do all of these processor instances use the same storage container?

Also, can run the application with this tracer it'll surface some more diagnostic information:

  1. Copy the contents of this file into your application somewhere (you'll need to adjust the package to something appropriate) stderrTracer.go
  2. In your application register the tracer:
tab.Register(&stress.StderrTracer{NoOpTracer: &tab.NoOpTracer{}})

Now what we should see is output like this, where each 'eph' instance reports what partitions it has acquired:

2023/10/04 15:44:13 DEBUG: eph "9dd37e11-d3a5-4c4c-b73a-aff16f52b2ba": acquired: [{"partitionID":"2","epoch":1,"owner":"9dd37e11-d3a5-4c4c-b73a-aff16f52b2ba","checkpoint":null,"state":"available","token":"a077a32c-556a-48da-bcd2-4bf6e0b1ba47"} {"partitionID":"1","epoch":1,"owner":"9dd37e11-d3a5-4c4c-b73a-aff16f52b2ba","checkpoint":null,"state":"available","token":"c45ea84d-1685-4dd2-ba03-eec172d50c94"} {"partitionID":"3","epoch":1,"owner":"9dd37e11-d3a5-4c4c-b73a-aff16f52b2ba","checkpoint":null,"state":"available","token":"ce312d0f-dc9a-4176-9d18-4eee2988155d"}], not acquired: [{"partitionID":"0","epoch":0,"owner":"","checkpoint":null,"state":"available","token":""} {"partitionID":"4","epoch":0,"owner":"","checkpoint":null,"state":"available","token":""}]

There will also potentially be ERROR messages and others as well.

If you can run your program with this enabled and add the output here (suitably anonymized if there's anything you don't want shown) it'll give me a spot to start.

PremSahooESL commented 1 year ago

@PremSahooESL, to clarify - are you and @devak8 working together? I want to make sure we're only troubleshooting one problem at a time.

Also, I'm assuming @richardcho-msft was tagged accidentally and you meant to tag me (@richardpark-msft ), correct?

Sorry for tagging wrong user.. it was supposed to be you (@richardpark-msft )

Correct... me and @devak8 are working together..

PremSahooESL commented 1 year ago

Can you give me some more details about your application? Do all of these processor instances use the same storage container?

Just to answer your question above.

Yes, all our processor instances use the same Storage Container. We will crosscheck this part and share the test result here.

PremSahooESL commented 1 year ago

github.com/Azure/azure-event-hubs-go/v3 We tested with above package. It works as expected with 5 partitions.

GoLang Consumer: 1 2023/10/06 15:46:14 DEBUG: eph "8ca2238d-3751-441d-9fb4-9f8673f2d999", partition "2", epoch 8: lease renewed 2023/10/06 15:46:16 DEBUG: eph "8ca2238d-3751-441d-9fb4-9f8673f2d999", partition "1", epoch 7: lease renewed 2023/10/06 15:46:17 DEBUG: eph "8ca2238d-3751-441d-9fb4-9f8673f2d999", partition "4", epoch 7: lease renewed

GoLang Consumer: 2 2023/10/06 15:45:47 DEBUG: eph "e3f7cb86-2e03-4304-967d-20ff50119509", partition "0", epoch 8: lease renewed 2023/10/06 15:45:50 DEBUG: eph "e3f7cb86-2e03-4304-967d-20ff50119509", partition "3", epoch 9: lease renewed

PremSahooESL commented 1 year ago

In our previous test. we were using GoLang based consumer and Python based consumer. So the partition lease was not accurate. As they were using different Storage container for checkpointing.

PremSahooESL commented 1 year ago

Thanks for your support @richardpark-msft

richardpark-msft commented 1 year ago

Thanks for your support @richardpark-msft Richard Park (DEVDIV) FTE

Just want to check - you're saying this issue can be closed?

Also, second check - was it intentional that you run both languages side-by-side? Just curious about the setup - was the idea that you incrementally upgrade/change out the language? Or just an accidental configuration?

PremSahooESL commented 1 year ago

This issue can be closed.

Just to answer your second check, please see below.

It was the test setup initially to run 2 consumers with 2 different languages. The expectation was both consumers should equally distribute the data from partitions. Is it that , even though consumers are of different languages, still they must use the same storage container for checkpoint? then only the distribution of eventhub partitions will work properly?

Intentionally, we were trying to run 2 different consumers with 2 different languages but it was accidentally with 2 different Storage Checkpoint containers.