dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.15k stars 2.04k forks source link

Orleans Streaming: in [SetCursor] method, If last purged token does not exists, do not throw an exception, just start from the oldest message in cache #8863

Open gusuchengnan opened 9 months ago

gusuchengnan commented 9 months ago

https://github.com/dotnet/orleans/blob/374ab206dee3a93de38f1a10ce38b85c99f0031c/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs#L204

Should it be changed to this:

// Check if we missed an event since we last purged the cache
var isLastPurged = this.lastPurgedToken.TryGetValue(cursor.StreamId, out var entry);
if (!isLastPurged || sequenceToken.CompareTo(entry.Token) >= 0)
{
    // If the token is more recent than the last purged token, then we didn't lose anything. Start from the oldest message in cache
    cursor.State = CursorStates.Set;
    cursor.CurrentBlock = oldestBlock;
    cursor.Index = oldestBlock.Value.OldestMessageIndex;
    cursor.SequenceToken = oldestBlock.Value.GetOldestSequenceToken(cacheDataAdapter);
    return;
}
else
{
    throw new QueueCacheMissException(sequenceToken,
        messageBlocks.Last.Value.GetOldestSequenceToken(cacheDataAdapter),
        messageBlocks.First.Value.GetNewestSequenceToken(cacheDataAdapter));
}
benjaminpetit commented 8 months ago

We want to throw QueueCacheMissException if we cannot guarantee that we didn't miss any event. QueueCacheMissException should be handled by the application code, and should be read as "we may have missed some events for this stream".

The lastPurgedToken dictionnary might not contains all the purged token (only recent streams purged by this streaming agent).

gusuchengnan commented 7 months ago

We want to throw QueueCacheMissException if we cannot guarantee that we didn't miss any event. QueueCacheMissException should be handled by the application code, and should be read as "we may have missed some events for this stream".

The lastPurgedToken dictionnary might not contains all the purged token (only recent streams purged by this streaming agent).

I found during testing that when the stream is idle for a period of time (more than 20 minutes) without any messages, when a new message needs to be transmitted through the stream, it will trigger a QueueCacheMissException, the message are not distributed and will be lost. I suspect it may be related to the handling of [PurgeMetadata()]:

`private void PurgeMetadata() { var now = DateTime.UtcNow;

// Get all keys older than this.purgeMetadataInterval
foreach (var kvp in this.lastPurgedToken)
{
    if (kvp.Value.TimeStamp + this.purgeMetadataInterval < now)
    {
        lastPurgedToken.Remove(kvp.Key);
    }
}

}`

lastPurgedToken will periodically remove data that is considered expired based on the [purgeMetadataInterval],resulting in the inability to locate the last purged token when a new one arrives.