dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.04k stars 2.02k forks source link

Issues with streams during rolling deploy (Orleans.Streams.QueueCacheMissException) #8986

Open erikljung opened 4 months ago

erikljung commented 4 months ago

I've spent a couple of days fighting issues with stream events gone missing during rolling deploy. I created a minimal repro, to rule out any business logic issues, but I can still reproduce the error. It's possible that this configuration is sub-optimal or that I'm doing something wrong, so please advice me.

Setup

Orleans version: 7.2.6 Stream pub sub configuration: StreamPubSubType.ImplicitOnly Stream storage: MemoryGrainStorage for "PubSubStore" (if this is applicable with ImplicitOnly?) GrainDirectory: Redis DefaultCompatibilityStrategy: BackwardCompatible DefaultVersionSelectorStrategy: LatestVersion Using Postgres as grain storage/membership

Repro

  1. Start silo A
  2. Start sending traffic to the cluster, see producer and consumer below
  3. Consumer (V1) activates and starts receiving events
  4. Start silo B, which joins the cluster, now a total of 2 silos in the cluster
  5. StreamGrain (V1) will deactivate on Silo A, and activate (V2) on silo B. This happens due to the version selector strategy, plus a call to another grain method (since implicit stream subscription grains won't move automatically)
  6. Stop silo A after some time
  7. One (1) Orleans.Streams.QueueCacheMissException occurs on Silo B at the consumers end. This is the only error I get. No error occur at the producers end
  8. Everything seems to recover, but some events never reaches the consumer (500 events out of 7000)

Questions

  1. Why is this happening, and can it be prevented?
  2. Is there any way to recover the events that the consumer grain never received?
  3. Is there another configuration that would be more suitable for rolling deploys?
Error

2024-05-08 12:30:23.571 [FTL] StreamGrain - Stream failed for grain "fd4e338d-36dd-4533-99c8-909c3a4ac187" Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638507681451236680, EventIndex=0], Low: [EventSequenceToken: SeqNum=638507681451236716, EventIndex=0], High: [EventSequenceToken: SeqNum=638507681451236729, EventIndex=0]
   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 303
--- End of stack trace from previous location ---
Producer

var stream =
  IClusterClient.GetStream<StreamEvent>(
    StreamId.Create(
        "Namespace.StreamEvent",
        request.GrainId ) );

await stream.OnNextAsync( new StreamEvent( request.Id ) );
Consumer

[ImplicitStreamSubscription( streamNamespace: STREAM_NS )]
public class StreamGrain : IGrainBase, IStreamGrain
{
    private const string STREAM_NS = "Namespace.StreamEvent";

    private IPersistentState<StreamState> Storage { get; }
    private ILogger<StreamGrain> Logger { get; }

    public IGrainContext GrainContext { get; }

    public StreamGrain(
        [PersistentState( nameof( StreamGrain ) )]
        IPersistentState<StreamState> storage,
        IGrainContext grainContext,
        ILogger<StreamGrain> logger )

    {
        Storage = storage;
        GrainContext = grainContext;
        Logger = logger;
    }

    public async Task OnActivateAsync( CancellationToken token )
    {
        LogGrainStatus( "Activating" );

        var streamId = StreamId.Create( STREAM_NS, this.GetPrimaryKey() );
        var stream = this.GetDefaultStreamProvider().GetStream<StreamEvent>( streamId );

        await stream.SubscribeAsync(
            onNextAsync: Register,
            onErrorAsync: OnError,
            token: Storage.State.LastStreamToken );
    }

    public Task OnDeactivateAsync( DeactivationReason reason, CancellationToken token )
    {
        LogGrainStatus( "Deactivating" );
        return Task.CompletedTask;
    }

    public async Task Register( StreamEvent input, StreamSequenceToken token )
    {
        Logger.LogInformation( "Storing event: {Id}; token {Token}", input.Id, token );

        Storage.State.LastStreamToken = token;
        Storage.State.ReceivedEvents.Add( input.Id );
        await Storage.WriteStateAsync();
    }

    private Task OnError( Exception ex )
    {
        Logger.LogCritical( ex, "Stream failed for grain {Id}", this.GetPrimaryKey() );
        return Task.CompletedTask;
    }

    private void LogGrainStatus( string state ) =>
        Logger.LogInformation(
            "{State} stream grain {Id}; Stream token: {Token}; Events: {Events}",
            state,
            this.GetPrimaryKey(),
            Storage.State.LastStreamToken,
            Storage.State.ReceivedEvents.Count );
}
TimoSchmechel commented 3 months ago

We are also getting this error during a rolling deploy. We dont have very high frequency events so I dont believe we are losing events due to this, but I havent actually checked for this.

Would be great to hear some official advice here as I will likely just disable the error logging from this callback or force a hard rollout when upgrading the cluster, neither seem like great solutions tbh.