Closed PoteRii closed 11 months ago
UPDATE:
So, message 7351558191738976016 was picked up by thread with id 35(?) three times...
(so it might be a race condition/thread safety issue?)
UPDATE: MessageDelayedProcessor.ProcessDelayedAsync starts running (runs every 60 sec) and picks up same messages from db while previous run has not finished processing 10k messages, hence the duplication in kafka appears
(A problem with ScheduleMessagesOfDelayedAsync query?)
UPDATE: this piece of my custom code is executed, so this means that assumption in my previous comment is correct.
"FOR UPDATE SKIP LOCKED" in ScheduleMessagesOfDelayedAsync
does not do much in terms of avoiding this duplication, because transaction is opened and committed as soon as messages are put in priority queue, so it makes those messages available for the next iteration of MessageDelayedProcessor.ProcessDelayedAsync
. There is also no option to change MessageDelayedProcessor
waiting interval.
P.S. even with this piece of code sometimes I see 10001 or 10002 messages in kafka (so couple of messages are still duplicated). This situation becomes worse in multi instance scenario (where instances are deployed with rolling deployment so it might be that there`s 59 seconds gap between deployments of two instances).
Why is that locking mechanism is used only in MessageNeedToRetryProcessor.ProcessPublishedAsync
and MessageNeedToRetryProcessor.ProcessReceivedAsync
? And if I get it right, depending on the number of messages needed to retry and performance of the system, despite the locking mechanism used during retry duplicated published messages can still appear in Kafka?
Hi @PoteRii ,
Thank you for your feedback. I took a quick look and as you said when there are a large number of delayed messages(same time trigger) and the delayed memory queue are not completely processed, duplication will occur when new messages are picked up by MessageDelayedProcessor.ProcessDelayedAsync
. When doing this feature, the situation of a large number of delayed messages was not considered.
The reason we are not using StorageLock in publish delay messages is that some people don't want to add extra lock table and want to use send delay message feature, and we used "FOR UPDATE SKIP LOCKED" can prevent multiple instances from getting the same message, because before placing it in the PriorityQueue, the message status will change to Queued before the transaction commited.
UPDATE: one question, Before the transaction is committed, the FOR UPDATE query locks a batch of data, and we update the status to Queued within the scope of the transaction. Why is this data still being queried and enqueue?
@yang-xiaodong Thank you for the reply. What happens is that, all relevant messages are fetched, updated (status is set as "Queued"), enqueued to priority queue and transaction is committed. However the next run of delayed processor picks up "Queued" messages again ("ExpiresAt"< @OneMinutesAgo AND "StatusName" = 'Queued'), and puts them in the priority queue again. It`s because of processing of priority queue takes more than 1 min and not all messages are updated as "Succeded" yet. Making them available for the next run of processor. Can you confirm?
Placing a check before placing something in priority queue (to skip id of already added message) will not work in multi instance environment. (maybe introduce a new status?)
Our use case is quite simple. We generate notifications for some events (those events might happen at midnight), but we want to actually send the notification in the morning or afternoon. So we have quite a big volume of notifications that were generated outside of notification delivery hours.
@yang-xiaodong I have added some logging in ScheduleMessagesOfDelayedAsync
and changed EnqueueToScheduler
method to return true
if message was added to the priority queue, than I am checking boolean result in ProcessDelayedAsync.ScheduleTask
and increasing count of queued messages (3rd line in the logs).
I`ve started the app and generated 10k messages to be published exactly at 2023-11-14 23:19:22
The logs I see are below
At the time of 5th execution the db state was as followed:
6259 messages are selected with condition "ExpiresAt"< 2023-11-14 23:19:24 AND "StatusName" = 'Queued'
This confirms the situation happening in the previous comment.
P.S. I`ve stopped this app run in the end, but by another run I confirmed that in the end the sum of "Messages queued" matched the number of messages in kafka topic (both equal to 27349).
As of my understanding (confirmed by increasing to 10 min and running the test again) the @OneMinutesAgo parameter in ScheduleMessagesOfDelayedAsync
is responsible for giving this messages 1 min to be processed. If it was @TenMinutesAgo for example, that would give 10 minutes to this 10k messages. What is exactly the purpose of this second condition in the query and what can be the downside of increasing or making this parameter dynamic?
Hi @PoteRii,
Thanks for your more information, I need some time to think about this problem, looks a bit difficult and I am busy recently.
Can you test whether simply changing
to
await EnqueueToPublish(_schedulerQueue.Dequeue()).ConfigureAwait(false);
can send 10k messages in 1 minute?
@yang-xiaodong As a matter of fact, EnqueueToPublish was able to handle 10k messages approximately in 1 min (a bit more). Publish TPS was around 8-10x higher (around 500-600 compared to 50-60 for SendAsync).
BUT - I was able to complete the test only once in 5 runs. CPU goes crazy (100%), Broker went down, sometimes it gets stuck somewhere between fetching messages and putting all of them to priority queue, publish channel capacity is 5k (increase has no effect), db can`t keep up with all the status updates, etc.
Although I think this is a logical issue rather than performance, because I can have X number of messages, that can`t be handled in 1 min by EnqueueToPublish
@PoteRii
I have an idea. In the processor for picking up delayed messages (ScheduleMessagesOfDelayedAsync), we remove the filter condition for messages with the status 'Queued'. Instead, we only pick up messages with the 'Queued' status and place them into the priority queue during the first application startup.
In scenarios with multiple instances, when UseStorageLock is enabled, we can use database locks to prevent different instances from picking up the same 'Queued' messages at startup. In this case, we need to consider a situation where the instance that acquired the database lock stops or crashes within the TTL (Time To Live) period. In such cases, these messages will need to wait to be triggered until the next instance startup after the TTL has expired.
What do you think about this idea?
@yang-xiaodong If I get this right, the messages can get stuck in Queued
status only in case of ungraceful shutdown. If that is the case than it makes sense to process those stuck messages only during app startup. TTL is FailedRetryInterval
+ 10 seconds (at least for NeedRetry
processor) right? So that will give the app 70 sec (by default) to process all the Queued
messages. Are my assumptions correct?
@PoteRii
The main issue is not whether all Queued
messages can be all processed within 70 seconds, but rather if the application crashes or restarts again within the 70 seconds, it will not be possible to read the Queued messages because the TTL has not yet expired. This is especially the case in Kubernetes, where the host name (instance id) changes after Pod restarts, and at this time the TTL has not exceeded 70 seconds, thus making it impossible to acquire the lock.
@yang-xiaodong thanks, understood. Are you thinking about creating a separate processor for that? In that case if there is already a lock for queued messages, it can wait for the TTL to expire (try at least twice).
@PoteRii
Following the above idea, there is another problem that cannot be solved: if there is a very large number of messages with the status 'Queued', we cannot load them all into memory at once. However, if we fetch messages in batches, it's impossible to know whether the previous batch has finished processing in order to decide whether to RenewLock.
One idea is to wait for a fixed duration and repeatedly check for messages still in the 'Queued' state. This means that within the TTL lifecycle, only a fixed number of messages can be processed. For example, if we process 500 messages per batch and there are 10K messages with a TTL of 60 seconds, it would take 20 minutes to process all messages. Moreover, each batch of 500 messages must be processed within 60 seconds, or else the issue of duplicates would still persist.
@yang-xiaodong would something like this work? Processor tries to acquire the lock Not acquired: Try again after lock TTL expires Acquired: Fetch queued messages one by one (or by batches) Send messages in a sync manner Renew lock once in a while before TTL expires if there are still messages to be processed
@PoteRii Consider a few additional things, producer guarantess at-least-once delivery same as broker does, so it's on a consumer side to make sure idempotent processing. @yang-xiaodong maybe it would make sense just to make batch size and time window to be configurable, as it won't be possible to know thoughtput of a client system, as at the moment there is no reliable way to know where there is a running sending process.
@demorgi It is natural to have duplications once in a while caused by a "glitch", but this case is quite different. One run can produce 25k messages insted of 10k. With a duplication rate for a message as high as 6-8 times.
@PoteRii
Send messages in a sync manner
Delayed Queued
messages cannot be sent synchronously, as these messages may not have reached the actual time for sending. This messages have a time window of 1 minute.
One solution is the messages support send completion callback. Set a callback for the last message of each batch (if they are delayed messages, they need to be sorted by send time), and fetch the next batch when the callback is triggered. Before this, the thread is paused and the lock will be refresh, and the callback will resume the thread to continue running.
@yang-xiaodong good idea
@PoteRii I've encountered another issue. Suppose I have five instances that have been running for a while, and one of them gets stopped. Instead of restarting it, I decide to continue running the remaining four instances. In this scenario, the stopped instance might have unprocessed Queued messages, which will remain unhandled indefinitely until the next time an instance is started.
@yang-xiaodong what about showing queued messages (that have been queued for quite long) in dashboard?
That's not a good idea. If the Queued messages use the approach we discussed, then the messages in the published and received tables with the status 'scheduled' will be handled using the same approach.
@yang-xiaodong Let's assume there is a process that starts on app startup. It acquires lock, fetches queued messages and processes all of them in batches and extends lock TTL during processing. What keeps me from running this process every X minutes?
@PoteRii
What keeps me from running this process every X minutes?
Because if every X minutes, this might easy picked up messages whose status has just been changed to Queued within a 1-minute window (Status from delay to queued not lock), when in fact these messages are already put into the priority queue.
UPDATE: Even if it's not run every X minutes, but only once upon startup, there is still a possibility of fetching messages from other instances within the 1 minute time window.
@yang-xiaodong I think the whole delayed/queuing concept has to be revised... Meanwhile the easiest short term solution would be to make OneMinutesAgo
query parameter configurable, to at least give fetched delayed messages more than 1 minute to be processed.
As a short term solution, see commit https://github.com/dotnetcore/CAP/commit/aa4c54041a8a990838523d2b68241fe56658a702 , you need override QueuedMessageFetchTime
Fixed in version 7.2.3-preview-217174309
@yang-xiaodong tested and can confirm this works as intended. Thank you.
We are seeing incorrect behavior of publishing delayed messages in production so I replicated the issue on local with quite simple scenario.
After startup of this project 10k messages should be published in Kafka in 5 minutes, but the result is that it`s always more than 10k (sometimes 13k, sometimes more). Below you can find the info when 22862 messages were produced.
GUID duplicated at least 3 times
All the headers are exactly the same
No messages have Retries > 0
22862 produced to the topic (confirmed by making separate consumer)
All of the below messages are duplicates
Packages used: DotNetCore.CAP.Dashboard Version 7.2.2 DotNetCore.CAP.Kafka Version 7.2.2 DotNetCore.CAP.PostgreSql Version 7.2.2