Closed asrosent closed 4 years ago
Hi @asrosent. We're sorry that you're encountering difficulties and appreciate you reaching out to let us know. In order to help, I'd like to start by trying to better understand a bit more context on your scenario:
How many processor instances are in use, and how many partitions are present for the Event Hub?
Can you confirm that all of the processors that you're observing are configured to use the same Consumer Group?
With respect to seeing duplicate events, can you help me understand how you're determining that the events processed are the same?
When you process duplicates, are you seeing a finite set where different processors read the same events from a partition or are you seeing a continual stream of the same events being read multiple times?
What is the strategy that you're using for checkpointing events from a given partition?
When you refer to "the logs", are you listening to the Event Hubs ETW logs or are you referring to a different approach for logging? In the latter case, would you be so kind as to offer a bit of detail into how you're collecting them?
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
hey @jsquire thank you for your kind response
How many processor instances are in use, and how many partitions are present for the Event Hub?
4 processor instances on 32 eventhub partitions
Can you confirm that all of the processors that you're observing are configured to use the same Consumer Group?
yes
With respect to seeing duplicate events, can you help me understand how you're determining that the events processed are the same?
in order to explain this I need to explain a little bit about our service.
our service is a pipeline for enriching events coming from other products in our company with additional data.
other products are sending us events to an eventhub.
we process those events with a set of microservices.
data is transferred from one microservice to another via an internal eventhub.
once an event has moved through all of our microservices we send the finished, enriched event to a kusto database.
in the beginning of our service pipeline we tag each incoming event with a unique GUID upon querying the output kusto database to count event by this unique GUID we have saw a substantial number of eventIds with a count of 2 (event content was also identical).
we suspect that the duplication is coming from one of our internal moves (microservice to microservice) through the eventhub.
in addition, looking at the azure portal event hub metrics for the suspected eventhub we have seen a constant behavior where there are more events read from the eventhub than events written to the eventhub (solidifying our suspection that we are experiencing double processing)
When you process duplicates, are you seeing a finite set where different processors read the same events from a partition or are you seeing a continual stream of the same events being read multiple times?
Finite set. some of the events are processed twice
What is the strategy that you're using for checkpointing events from a given partition?
this is the method we are using as the ProcessEventsAsync event for out EventProcessorClient.
basicly, we try to process the event, if we get an exception we handle it and than we checkpoint
When you refer to "the logs", are you listening to the Event Hubs ETW logs or are you referring to a different approach for logging? In the latter case, would you be so kind as to offer a bit of detail into how you're collecting them?
I mean our own service logs.
This is how we collect the logs.
first of all we have created the following methods to use as the PartitionInitializingAsync & PartitionClosingAsync events in our ProcessorHostClient
as you can see we use our own Logger.Info method inside this methods. Logger.Info has an optional parameter for callerMemberName which uses the CallerMemberNameAttribute (https://docs.microsoft.com/en-us/dotnet/api/system.runtime.compilerservices.callermembernameattribute?view=netcore-3.1) so it is aware of the method calling it.
Logs from Logger.Info are sent to a set of targets. one of the targets is an internal Kusto database for logging. logs are sent there with additional information of pod name and logger name which makes it easier to investigate them later on in the process
After some investigation of our case we found the following logs this have raised our suspection that these partitions are being read twice since unlike the other logs that we can see there there are no correlating CloseAsync logs for these partitions. once restarting the pod that have sent these logs event duplication has stopped.
Thanks again, Asaf.
Thank you, @asrosent. I appreciate the thorough additional context; it's definitely helpful. I'm not quite sure yet whether there's an underlying issue or we're seeing some normal behavior. I'd like to offer a bit more detail into some of what my thought process is and what behaviors should be expected. Because that's likely to be a bit long, I'll first ask for a bit of help, if you're willing so that it doesn't get lost.
Would it be possible to capture the id of the partition and sequence number of and event and use that for duplicate detection? Those two elements are enough to prove a unique event for the Event Hubs service and will help to prove that we're seeing the same event from the same partition and not an event that was accidentally published to more than one partition.
Can you help me understand if you're seeing the behavior for a short period of time when adding/removing processor instances, such as when scaling or when pods are cycled?
You mentioned that you're seeing a finite set of duplicates. Just to confirm, what I was asking was whether you saw a small set of duplicates for a short period and then the duplicates stop or once you started seeing duplicates every event from that partition is duplicated until you kill the pod.
The logs that you've shared seem to indicate that partition ownership is being moved around frequently. If this is not in response to processor instances scaling (or pods cycling), then the root of this may be long-running processing in the handler or communication issues with Azure Storage that are preventing the processor from refreshing its claim on ownership.
ProcessEventInternal
call normally takes? When correlating the Open/Close log entries, what's the largest time gap that you're allowing between those calls for a given processor? In the case where a handler runs slowly (which includes the checkpoint call in your code) it may delay the processor being made aware that ownership was lost and raising the close event.
If you're willing, I'd be very interested in seeing the ETW logs collected by the AzureEventSourceListener
with respect to partition ownership. They'll have a bit more granularity to understand the flow and may offer insight into whether a processor was actively reading from the service or emptying it's memory cache.
Given the partition ownership migration that this snapshot of your logs show, I'm concerned about the checkpointing strategy. Since you're marking every event, when ownership changes the new processor will read the last set checkpoints and use that to start processing. The old owner may still have events from that partition in memory that it is emitting to the handler. In this case, you'll now have two processors battling for the checkpoint at different places in the partition. Normally, this settles after a small number of collisions (< 50) once the memory buffer is exhausted, but if you are seeing partitions migrate frequently, you could see the frequent checkpointing cause some inconsistencies and potentially replay a non-deterministic set of events.
One key thing to keep in mind is that Event Hubs has an at-least-once delivery guarantee. Your application should expect to potentially see some duplicate events and be idempotent when processing.
One of the scenarios where this is likely is when multiple Event Processor Clients are collaborating. Because the clients coordinate “steal” ownership of partitions from each other to keep work balanced, that's a common window to see a small set of duplicates. This is especially noticeable when a new processor is started while an existing processor is running. There are a couple of key reasons for this:
When a processor takes ownership of a partition, it requests to open an exclusive link for the partition, causing the Event Hubs service to disconnect the previous owner. The previous owner becomes aware that its ownership was lost when it next requests to read events from the partition. This may not be immediate if the previous owner has a batch of events in memory that it is emitting to the handler for processing; until processing for that batch of events is complete, the processor will not attempt to read from the partition. As a result, those events may be seen by both processors.
When ownership of a partition changes, the new processor will use the most recently written checkpoint for the partition as the place to start reading from; if the previous owner had processed some events after a checkpoint was written, the new processor will read and process them again. You have some control over this by adjusting the strategy that you’re using for checkpointing.
Once the processor clients have balanced the partitions between them, they are less likely to steal from one another and you should not be observing a regular series of duplicates. It’s important to note that ownership of partitions may still occasionally change, so you may continue to see occasional small sets of duplicates due to the processing overlap.
With respect to seeing partitions claimed buy no corresponding close, this may be latency rather than a partition being actively processed multiple times. The Event Hubs service is the authority that enforces the invariant that there can be no more than a single active reader of a partition for a given consumer group. As mentioned above, the processor not immediately aware and may have event in a memory buffer that it will emits them to the handler; until those events are emitted and the processing handler has completed, there will be no observed call for the partition closing.
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Hey @jsquire, sorry for the delay and thanks again for your kind response. Since we last discussed the issue it has re-occurred several times in our service and becoming a major problem with EventProcessorClient reliability for us.
Would it be possible to capture the id of the partition and sequence number of and event and use that for duplicate detection? Those two elements are enough to prove a unique event for the Event Hubs service and will help to prove that we're seeing the same event from the same partition and not an event that was accidentally published to more than one partition.
Currently it will be a little hard for us to implement this. but from what I see all of our evidence points to write once - read twice scenario. (also, when we get into this situation, in the EventHub metrics from azure portal we can clearly see a consistent higher reading rate than writing rate)
Can you help me understand if you're seeing the behavior for a short period of time when adding/removing processor instances, such as when scaling or when pods are cycled?
We see this behavior for long periods of time and usually unrelated to when we add/remove processor instances.
You mentioned that you're seeing a finite set of duplicates. Just to confirm, what I was asking was whether you saw a small set of duplicates for a short period and then the duplicates stop or once you started seeing duplicates every event from that partition is duplicated until you kill the pod.
My bad, what I meant is that we see a set of 2 output events per every input event from that partition indefinitely until we kill the pod
The logs that you've shared seem to indicate that partition ownership is being moved around frequently. If this is not in response to processor instances scaling (or pods cycling), then the root of this may be long-running processing in the handler or communication issues with Azure Storage that are preventing the processor from refreshing its claim on ownership. Do you have an idea how long your ProcessEventInternal call normally takes? Are you seeing any calls to the error handler that would indicate timeouts when talking to Storage?
Our ProcessEventInternal shouldn't be a long process. between our micro services we batch multiple (~50) events in every even eventData and our ProcessEventInternal usually goes like this:
here is an example from one of our services:
in addition we do not see any storage related error handling
When correlating the Open/Close log entries, what's the largest time gap that you're allowing between those calls for a given processor? In the case where a handler runs slowly (which includes the checkpoint call in your code) it may delay the processor being made aware that ownership was lost and raising the close event.
I might have misunderstood your question but we do not limit the time. we never see a "closed" log until we gracefully restart the pod
If you're willing, I'd be very interested in seeing the ETW logs collected by the AzureEventSourceListener with respect to partition ownership. They'll have a bit more granularity to understand the flow and may offer insight into whether a processor was actively reading from the service or emptying it's memory cache.
we are very willing but I couldn't find any guidance on how to use this class. if you can give me an entry point I will do my best to provide you with the logs 😁
If there is any additional information I can provide you just ask.
Thanks again, Asaf.
Hi @asrosent,
Thanks for the additional context, and I'm sorry that you're still experiencing difficulties. Speculatively, there are only two scenarios that make sense to me here (and they're both grasping at straws a bit):
The processors are using different consumer groups, but pointing to the same Event Hub. (You've already mentioned this isn't the case)
Something in the networking configuration is confusing the Event Hubs service to where it doesn't believe that there are two active AMQP links to the same partition (since it is the service that enforces a single reader) AND the processors believe they're the same instance due to the same Identifier
value being explicitly assigned via the EventProcessorClientOptions
during constructions.
Let's try a couple of things to dig in:
Can you share a snippet showing how the processors are being initialized?
Can you share configuration of the K8s and container networking? If you'd prefer not to post the configuration information into the issue, please let me know and we can discuss an alternative.
Can you instrument the processors and capture logs? There's some information on using the AzureEventListener in this sample (apologies for not calling it out earlier).
The next time you see this behavior, can you note the date/time? It's likely that we'll need to ask the service team to take a look at the service-side logs which will need some additional information, but if we get to that point, we'll want to shift over to email to discuss to avoid posting sensitive information here.
One additional suggestion, which I don't believe is related to this issue would be to consider updating to v5.1.0 of the processor. There was a significant overhaul of the internals and you should see the new version be more efficient with resources as well as more performant.
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Hi Guys, lets say that I'm facing the similar problem, I have duplicate messages which I'm taking from eventhub by differnet processors (4 procesors for 32 partitions, on differend pods in K8). The delay between the processing of these two same messages is about 0 ms, this is ok I can understand it by stealing the partition on different procesors, but sometimes the difference is more then hours (record is 21 hours) and this is what I cannot understand. I'm interesting how this conversation thread is finished, can you share some conclusion, if any? @jsquire can you help me?
Hi @jiripoledna. The processor itself does no long-term caching; if you're seeing duplicate events 21 hours apart then the best speculation that I have is one of the following:
The events believed to be duplicates were published multiple times and seen by the Event Hubs service as different events. The majority of investigations lead back to this, in my experience. If Event Hubs sees them as the same event they would come from the same partition and have the same Sequence Number assigned by the service.
The partition to which the events were written changed ownership and the duplicate event appears in the partition after the most recent checkpoint was written.
The Event Hubs service surfaced the event from the partition multiple times.
The most important guidance that I can offer on this is to be aware underlying guarantee of the Event Hubs service as at-least-once delivery. It is possible, though rare, for there to be duplicate events read from the service. It is also possible, as mentioned, for the load balancing of processors to produce duplicates. Applications which process events should be sure to guarantee idempotency in processing.
@jsquire Thank you for your answer, I know that the idempotency is important and we count with it. The duplicity is not the problem now but I only wanted to know if the duplicity is by design or I have something wrong in my code or in my understanding .
According to the source code there is nothing to prevent 2 nodes to read the same partition. At least in python code. The implementation is quite incorrect. It depends on some optimistic lock and the previous owner of partition does not know someone stole his partition for some time, and some other processor might already stole it, currently it doesnt even use a lease just depends on ETAG. Also it would work better probabbly if node that has too many partitions would detect that there is someone who has too few and release some, and only non-taken would be taken by someone else
Describe the bug my team got a notification that we have duplicate output events from our production pipeline. upon researching we discovered that some of our EventHub partitions were opened simultaneously on 2 different nodes processing events from this partitions twice.
We solved the issue by restarting nodes.
Expected behavior Once a EventProcessorClient is opening a partition on a certain node is should be closed everywhere else it is opened
Actual behavior (include Exception or Stack Trace) The EventProcessorClient is not closing the partition after it was opened on another node.
To Reproduce We do not yet know how to reproduce this scenario but I can try to describe what we know from the logs.
Everything was working as expected. at a certain point the EventProcessorClient on one of our nodes decided to open 2 extra partitions. after opening the partitions we did not get logs specifying that these partitions were closed on any other node.
Environment: Package Name: Azure.Messaging.EventHubs.Processor Package Version: 5.0.1 Hosting platform: docker on kubernetes Hosting OS: Debian 10 .Net runtime version: .net core 3.1.4 IDE and version: Visual Studio 16.6.0
Thanks!