dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.15k stars 2.04k forks source link

QueueCacheMissException: Item not found in cache. #8306

Open wassim-k opened 1 year ago

wassim-k commented 1 year ago

We've encounted QueueCacheMissException in production where it happens infrequently and at seemingly random times. I've managed to replicate it in the test code below, by basically taking the default configuration for streams and dividing it by 300s in order to maintain the original ratio.

[Fact]
public async Task Test()
{
    using var fixture = new ClusterFixture();
    var client = fixture.SiloHost.Services.GetRequiredService<IClusterClient>();
    var logger = fixture.SiloHost.Services.GetRequiredService<ILoggerFactory>().CreateLogger("Logger");
    var streamProvider = client.GetStreamProvider("SimpleMemoryStreamProvider");
    var stream = streamProvider.GetStream<int>("TestStream", Guid.Empty);

    foreach (var i in Enumerable.Range(6, 6))
    {
        logger.LogInformation("Sending {Value}...", i);
        await Task.Delay(TimeSpan.FromSeconds(i));
        await stream.OnNextAsync(i);
    }
}

[ImplicitStreamSubscription("TestStream")]
public class TestGrain : IGrainBase, IGrainWithGuidKey
{
    private readonly ILogger<TestGrain> _logger;
    private readonly IGrainContext _grainContext;

    public TestGrain(ILogger<TestGrain> logger, IGrainContext grainContext)
    {
        _logger = logger;
        _grainContext = grainContext;
    }

    public IGrainContext GrainContext => _grainContext;

    public async Task OnActivateAsync(CancellationToken token)
    {
        var streamProvider = this.GetStreamProvider("SimpleMemoryStreamProvider");
        var stream = streamProvider.GetStream<int>("TestStream", this.GetPrimaryKey());

        await stream.SubscribeAsync(
            (value, token) =>
            {
                _logger.LogInformation("Received {Value}", value);
                return Task.CompletedTask;
            },
            ex =>
            {
                _logger.LogError(ex, "A stream error has occurred.");
                return Task.CompletedTask;
            });
    }
}

public class ClusterFixture : IDisposable
{
    public ClusterFixture()
    {
        var builder = new TestClusterBuilder(1);

        builder.AddSiloBuilderConfigurator<TestSiloConfig>();

        Cluster = builder.Build();
        Cluster.Deploy();
        SiloHost = ((InProcessSiloHandle)Cluster.Primary).SiloHost;
    }

    public TestCluster Cluster { get; private set; }

    public IHost SiloHost { get; private set; }

    public void Dispose()
    {
        Cluster.StopAllSilos();
        Cluster.Dispose();
        GC.SuppressFinalize(this);
    }

    private class TestSiloConfig : ISiloConfigurator
    {
        public void Configure(ISiloBuilder hostBuilder)
        {
            hostBuilder
                .ConfigureLogging(logger => logger.AddDebug())
                .ConfigureServices(services =>
                {
                    services
                        .Configure<StreamCacheEvictionOptions>("SimpleMemoryStreamProvider", options =>
                        {
                            options.DataMaxAgeInCache = TimeSpan.FromSeconds(6);
                            options.DataMinTimeInCache = TimeSpan.FromSeconds(1);
                        });
                });

            hostBuilder
                .AddMemoryGrainStorageAsDefault()
                .AddMemoryGrainStorage(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)
                .AddMemoryStreams<DefaultMemoryMessageBodySerializer>("SimpleMemoryStreamProvider", configure =>
                {
                    configure.ConfigurePullingAgent(ob => ob.Configure(options =>
                    {
                        options.StreamInactivityPeriod = TimeSpan.FromSeconds(6);
                    }));

                    configure.ConfigureStreamPubSub(Orleans.Streams.StreamPubSubType.ImplicitOnly);
                });
        }
    }
}
23:46:08:475    Logger: Information: Sending 6...
23:46:14:483    Logger: Information: Sending 7...
23:46:14:734    TestGrain: Information: Received 6
23:46:21:495    Logger: Information: Sending 8...
23:46:29:492    Logger: Information: Sending 9...
23:46:29:492    TestGrain: Error: A stream error has occurred.
23:46:29:492    
23:46:29:492    Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907519, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0]
23:46:29:492       at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:29:492       at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:29:492       at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:29:492       at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:29:492       at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:29:492    TestGrain: Information: Received 8
23:46:38:491    Logger: Information: Sending 10...
23:46:43:738    The thread 0x3ec8 has exited with code 0 (0x0).
23:46:48:498    Logger: Information: Sending 11...
23:46:48:498    TestGrain: Error: A stream error has occurred.
23:46:48:498    
23:46:48:498    Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638111979680907521, EventIndex=0], Low: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0], High: [EventSequenceToken: SeqNum=638111979680907523, EventIndex=0]
23:46:48:498       at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
23:46:48:498       at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
23:46:48:498       at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
23:46:48:498       at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
23:46:48:498       at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
23:46:48:498    TestGrain: Information: Received 10
flash2048 commented 1 year ago

Hello @adityamandaleeka, These bugs were planned to be fixed in the 7.1.1 version, but the currently released version is 7.2.1. Is it planned to fix these in the next versions?

Bohdandn commented 10 months ago

This issue results in missing stream message. It seems to get reproduced if your stream has message send gaps longer then StreamInactivityPeriod that is 30 min by default.

It seems that after stream deactivation due to inactivity next message will not reactivate it, it will move the cursor but be purged (see DataMaxAgeInCache), message after activates it but fails due to missing message in cache.

Tried workaround: to set StreamInactivityPeriod to multiple days (to be sure that all streams are always active, its ok in our case) - helps with test code above but doesn't work in real app