Open PaulBernier opened 7 months ago
Ooh, this is very interesting. So the recent change I made was to fix another issue, where you'd end up with a new consumer inadvertently getting starved out because it didn't want to pick off extra partitions.
It looks like even a single consumer added (so with 0 current partition owned) would de-allocate all the owners owning maxAllowed partitions,
These are only candidates for stealing - the new consumer isn't going to take all of those available partitions, just one (randomly). This is going to lead to a transition period for the affected consumer and the new consumer, but this is by design.
With all of that said, there could always just be a bug in the way I'm doing it! The general approach for a consumer is:
Step 3 will always destabilize some consumers during the time when it's taking over the partition.
We have some methods you can try out if you want to have more control over how/when the acquisitions happen:
LoadBalancingStrategy
to minimize disruption link. We have two strategies currently - ProcessorStrategyBalanced
and ProcessorStrategyGreedy
.
If you can tell me a bit more about your situation I'm happy to keep talking about it. Small changes could yield big dividend!
Thanks for the quick reply and explaining further the algorithm. I did miss that those were only candidates for stealing. I will immerse myself further in the algorithm and try to match that to the logs I have and see if I can pinpoint a particular issue.
Based on your guidance and carefully reviewing the logs I don't think it is an issue with the load balancing algorithm in itself anymore, but it's still related to that area.
I captured the issue on a simpler set up with 32 partitions and 10 consumers. Around the same time 9 of the consumers were detected as expired by the only "survivor" (see owners: 1
)
4:03:05.046 PM [40a5703c-f756-41c8-78cf-010f83bd638d] claimMorePartitions: true, owners: 1, current: 4, unowned: 0, expired: 28, above: 0
it then started to claim 1 by 1 partitions as expected. Then I looked at the logs of those other consumers and first thing is that they were all still alive and receiving events just fine (I can see metrics for those). The issue with those is that for all 9 of them around the same time I see the log
4:02:12.978 PM [9119402c-3aaf-4c48-6187-5e26210cb419] Listing ownership for pbernier-premium.servicebus.windows.net/test-32-partitions/xxxx
and normally this log is followed by a log claimMorePartitions: X, owners: X, current: X, unowned: X, expired: X, above: X
within ~20ms. But in this case that log came approximately ~3min later. That means that ownership was not refreshed in the checkpoint store for that time and considered expired after 1 minute. Then when that log came in after 3 minutes, they all restarted to claim ownership again.
I am using the built-in Azure blob storage checkpoint store. What seems most probable is that an issue on blob storage around that time caused that line to hang for 3 minutes. Note that the consumer never errored, nor did it get restarted at any time, it just eventually recovered when that blocking call eventually returned.
What I'd like to suggest is to add a deadline and/or retries to that lb.checkpointStore.ListOwnership
. It's likely that if that faulty call was canceled and retried a few second later it would have gone through and would not have lead to a complete reshuffling of partitions. I am thinking the deadline should be equal to ownershipUpdateInterval
as it doesn't make much sense to wait from checkpointStore longer than that. Basically I think we can improve on the recovery process here. Let me know if that makes sense.
I continued further the investigation with more logging, I confirmed that the root cause is that the Azure Blob storage checkpoint store sometimes (every hour or so on average) hang for 3 minutes (it's always 3 minutes +/-10s, I doesn't look like a random value) before eventually succeeding. It can happen on any type of calls, if it's ListOwnership
or ClaimOwnership
then the result is failure to refresh ownership, so it get stolen by another consumer and lead to a reshuffling of partitions (what I initially opened this issue for), but it can also happen for checkpointing methods, and in this case it can block the processing of data while we wait for the checkpoint to be saved. While I understand it's normal for networking calls to occasionally fail, it struck me as odd that it happens so often/regularly, and always with a 3min hang before succeeding.
Ultimately I was able to completely mitigate the issue by adding a timeout to the container client used by the blob checkpoint store, picking a value shorter than the ownership timeout:
containerOpts := &container.ClientOptions{
ClientOptions: policy.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 4,
TryTimeout: 10 * time.Second,
},
},
}
This solves the issue specifically for Azure blob storage checkpoint store. I still wonder if that issue should be universally mitigated by having timeout/retry within the Processor itself. Also if you have any idea where that 3 minute hang for the calls to blob storage could be coming from, let me know :)
@PaulBernier, as usual your responses do not disappoint. :)
There are some configuration settings that sometimes get set that can have detrimental effects on our performance with blob stores. Basically, you should NOT enable soft delete or blob versioning when using the storage as a checkpoint store. link.
It can cause performance issues very similar to this.
If you don't have those enabled, then our next step is to file an Azure support ticket. Support can look at your actual storage instance, which should provide more information.
I still wonder if that issue should be universally mitigated by having timeout/retry within the Processor itself.
You've mentioned two spots that are vulnerable when Storage slows down.
Area 1, that's inside of the Processor's load balancing loop, would need a reasonable timeout per-round. When it hangs it'd be nice to return an error so you have some idea something is going wrong and we don't provide a way to limit it. I'll need to think a bit on how we'd want to expose that - my first thought was we could, as I believe you mentioned earlier, try to derive it from what we consider the maximum time for a "round" of load balancing.
Area 2, where you're updating a checkpoint, does allow you to provide your own timeout via the context parameter and that's idiomatic to our SDK, in general:
ctx, cancel := context.WithTimeout(parentCtx, time.Minute)
defer cancel()
if err := partitionClient.UpdateCheckpoint(ctx, events[len(events)-1], nil); err != nil {
return err
}
I'll keep this open in case you have other questions, and also as a tracking issue for introducing the ability to timeout in the processing loop. Please feel free to ask more questions or drop in observations - it's all been very helpful.
Bug Report
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3
go version go1.22.0 darwin/amd64
When running 80 consumers against one 100-partition EventHub I noticed that sometimes there is a sudden massive rebalancing of the partitions, which I confirmed from that log:
The issue is that under load this also create a massive swing in throughput for a few minutes until things stabilize again. Interestingly this kind of event also happened when no load was there.
At my current stage of investigation I think it's related to that line: https://github.com/Azure/azure-sdk-for-go/blob/6e61c5ea1c948e54cbf69f8e0876468df7843065/sdk/messaging/azeventhubs/processor_load_balancer.go#L178 and in particular the
len(groupedByOwner[lb.details.ClientID])
part of the condition. It looks like even a single consumer added (so with 0 current partition owned) would de-allocate all the owners owningmaxAllowed
partitions, which could be a lot. Could there be a better algorithm that would be more surgical in its partition re-allocation? I'll spend some time to think about it as well.Tagging @richardpark-msft as you have been touching that area recently. Thanks