dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.11k stars 2.04k forks source link

PersistentStreamPullingAgent skips over the message under a certain condition #9023

Open tchelidze opened 5 months ago

tchelidze commented 5 months ago

Problem:

When publishing a message under the following conditions

Then the published message is lost.

How to reproduce:

here is the link to the GitHub repo demonstrating the issue https://github.com/tchelidze/Orleans_MemoryStream_LostMessage/tree/master/Orleans_MemoryStream_LostMessage

Analysis:

Consider the following scenario : We published message number 1 to the stream. Then we wait and in the meantime stream goes inactive and message cache gets purged. After that we publish message number 2 and 3 to the stream. Then the following happens.

Inside PersistentStreamPullingAgent.DoHandshakeWithConsumer (which gets called from RegisterAsStreamProducer, remember, stream is inactive) method we retrieve the last processed message token from the consumer. in our case that would be message number 1. Then we take that token (pointing to message number 1) and call queueCache.GetCacheCursor passing that token.

What GetCacheCursor does is where the problem lies. Specifically PooledQueueCache.SetCursor where it checks if the oldestMessage is passed the given token. oldestMessage in our case would be message number 2, while token is message number 1. So that if statement is executed on line 201. Then the interesting part comes, we check if the lastPurgedToken is same or passed the given token. lastPurgedToken again point to message number 1, because that was the last message that was evicted from the stream. So that if statement also executes and PooledQueueCache.SetCursor sets the SequenceToken to the oldest message, which is message number 2.

Issue number 1: As i understand, lastPurgedToken points to the message which was evicted and no longer in the cache, so checking for sequenceToken.CompareTo(entry.Token) >= 0 does not seem correct here, instead i think it should be sequenceToken.CompareTo(entry.Token) > 0.

Story continues.

So, back to PersistentStreamPullingAgent.DoHandshakeWithConsumer line number 315. Here the expectation is that queueCache.GetCacheCursor gives us back the cursor that points to a last processed message, but because queueCache no longer has the message (message number 1) it returns cursor pointing to the oldest message, which in our case would be message number 2. On line 315 we move the cursor forward (because remember, expectation was that Cursor was pointing to the last processed message). As a result, now cursor points to message number 3, and that's how message number 2 is lost.

Issue number 2:

I think in PersistentStreamPullingAgent.DoHandshakeWithConsumer, instead of blindly moving the cursor to next, we should check if it points to the same position as requestedHandshakeToken and if it does not, then we should not move it forward.

Workaround

Only workaround i can think of is to set the StreamPullingAgentOptions.StreamInactivityPeriod and StreamCacheEvictionOptions.DataMaxAgeInCache to very high values to avoid the scenario where queueCache is empty and stream is inactive.

Thoughts ?

tchelidze commented 3 months ago

any updates ?

davidvanchu commented 2 months ago

@benjaminpetit It seems like our fix for preventing duplicate event delivery (https://github.com/dotnet/orleans/pull/7699/files from https://github.com/dotnet/orleans/issues/7686) is inadvertently causing messages to be skipped. We recently upgraded to Orleans 8 from 3.x, and are seeing this issue as well, although we are using the Event Hub stream provider, not MemoryStreams.

davidvanchu commented 2 months ago

I've played around with the tests in Orleans and I've been able to reproduce the issue here: https://github.com/dotnet/orleans/compare/main...davidvanchu:orleans:v810_fix_stream_skip

The MemoryStreamResumeTests will show a skipped message after waiting for the stream to become inactive and/or after deactivating the grain.

Removing the MoveNext()s added in https://github.com/dotnet/orleans/pull/7699/files, the tests pass, and if you uncomment the //Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, grainInts); and re-run the test, you can see that after removing the MoveNexts, we have some duplicates in the grainInts, array, but that is better than skipping the messages.

Before removing MoveNext()s: image After removing MoveNext()s: image

fjoly-hilo-energie commented 1 month ago

Is there any update on this? The chance of skipping messages is the reason we're still not on the Streams feature.

davidvanchu commented 4 weeks ago

@ReubenBond Can we get an official fix in for this? We are using a forked version of Orleans with this fix in place since we discovered the issue late August. If you would like me to create a PR for this, I am happy to, please confirm that the fix I have in my branch linked above makes sense from your perspective.

ReubenBond commented 4 weeks ago

cc @benjaminpetit

Costo commented 4 days ago

Hi, I think we are seeing the same issue with Event Hubs streams. We have grains representing locations, and every hour a message is published to each location (energy consumption) via an implicit stream. Yesterday we increased the grain collection age from 15 minutes (default) to 2 hours in order to avoid unnecessary traffic on grain storage account every hour. Now we are seeing millions of QueueCacheMissExceptions and the majority of the messages are lost (not received by grains).

oising commented 3 days ago

Hi, I think we are seeing the same issue with Event Hubs streams. We have grains representing locations, and every hour a message is published to each location (energy consumption) via an implicit stream. Yesterday we increased the grain collection age from 15 minutes (default) to 2 hours in order to avoid unnecessary traffic on grain storage account every hour. Now we are seeing millions of QueueCacheMissExceptions and the majority of the messages are lost (not received by grains).

This is not the same issue. A queue cache miss means that your grains are out-living the lifetime of the cache (roughly speaking.) This isn't intuitive, unfortunately. The way Orleans figures out if you may have missed a message is that it tries to keep track of the last message you processed in the cache. It needs to see the last message when it receives a new message to know that there hasn't been any intermediate messages (that you may have missed.) It tracks the last message for as long as the stream is active for a given grain observer. Now that your grains are living for up to two hours, when the grain receives a new message on the hour, the message from an hour ago has long since been purged from the cache. So Orleans will now throw thie queue cache miss exception - which actually means "I have no idea if I missed a message because the last message I processed is no longer visible [in the cache]." It's just a warning - but a very spammy one at that.

In short, the timespan in MetadataMinTimeInCache must always be larger than the grain collection age for the type of grain that is consuming the stream. So, if your grains are now living for up to two hours, you need to keep metadata for, let's say, 2h15m to be safe. Metadata is used for tracking n-1 message only and doesn't have a significant effect on cache memory usage. The other fields DataMaxAgeInCache and DataMinAgeInCache control the window for rewinding and have a significant effect on cache memory usage.

Before when your grains only lived 15m, the stream cursor would be deallocated and the streaming runtime would "forget" about the last message received before the next one could arrive 45m later.

This kind of problem is unique to the "rewindable cache" model for advanced streaming providers like event hubs. Orleans reads all partitions into silo specific memory caches to allow individual grains to rewind the stream without blocking other grains were it to actually rewind into the partition directly. There is one cache per agent, one agent per partition and possibly multiple agents per silo.

Costo commented 3 days ago

Hi Oisin,

I know that QueueCacheMissExceptions used to be warnings and the messages were delivered anyway. But the new behavior is that the messages are dropped. Maybe changing the MetadataMinTimeInCache value will fix the problem, but Orleans should not drop messages in the first place.

oising commented 3 days ago

Hi Oisin,

I know that QueueCacheMissExceptions used to be warnings and the messages were delivered anyway. But the new behavior is that the messages are dropped. Maybe changing the MetadataMinTimeInCache value will fix the problem, but Orleans should not drop messages in the first place.

Oh, I missed that part. I don't recall any new behaviour that should drop messages. That's quite odd and definitely sounds like a bug somewhere. If it is this issue then yeah, you may be able to avoid triggering it