hongliyu2002 / Orleans.EventStore

MIT License
28 stars 5 forks source link

QueueCacheMissException in sample application after server restart #4

Open nesc58 opened 1 year ago

nesc58 commented 1 year ago

Hi, when I use the sample application I ran into some errors.

When I start the server application, I can use the client and join a chat channel with version 0. I type some messages, Leave the channel and joining again. Everything works. But when the server application restarts I receive some errors.

The error is

Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceTokenV2: SeqNum=0, EventIndex=0], Low: EventStoreSequenceTokenV2(Position: 15, SequenceNumber: 15, EventIndex: 0), High: EventStoreSequenceTokenV2(Position: 19, SequenceNumber: 19, EventIndex: 0)
   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
   at Orleans.Providers.Streams.EventStore.EventStoreQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in \Orleans.EventStore\src\Orleans.Streaming.EventStore\Providers\Streams\EventStoreQueueCache.cs:line 139
   at Orleans.Providers.Streams.EventStore.EventStoreQueueAdapterReceiver.Cursor..ctor(IEventStoreQueueCache cache, StreamId streamId, StreamSequenceToken token) in \Orleans.EventStore\src\Orleans.Streaming.EventStore\Providers\Streams\EventStoreQueueAdapterReceiver.cs:line 290
   at Orleans.Providers.Streams.EventStore.EventStoreQueueAdapterReceiver.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in \Orleans.EventStore\src\Orleans.Streaming.EventStore\Providers\Streams\EventStoreQueueAdapterReceiver.cs:line 242
   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305
--- End of stack trace from previous location ---

What must be changed in the samples application to prevent this kind of errors?

nesc58 commented 1 year ago

When I change the implementation of EventStorePersistentSubscriptionReceiver from

catch (RpcException ex) when (ex.StatusCode is StatusCode.AlreadyExists)
{
    // await _subscriptionClient.DeleteToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, null, _settings.Options.Credentials).ConfigureAwait(false);
    // await _subscriptionClient.CreateToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, _settings.ReceiverOptions.SubscriptionSettings, null, _settings.Options.Credentials).ConfigureAwait(false);
    await _subscriptionClient.UpdateToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, _settings.ReceiverOptions.SubscriptionSettings, null, _settings.Options.Credentials).ConfigureAwait(false);
}

to

catch (RpcException ex) when (ex.StatusCode is StatusCode.AlreadyExists)
{
    await _subscriptionClient.DeleteToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, null, _settings.Options.Credentials).ConfigureAwait(false);
    await _subscriptionClient.CreateToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, _settings.ReceiverOptions.SubscriptionSettings, null, _settings.Options.Credentials).ConfigureAwait(false);
    //await _subscriptionClient.UpdateToStreamAsync(_settings.QueueName, _settings.ConsumerGroup, _settings.ReceiverOptions.SubscriptionSettings, null, _settings.Options.Credentials).ConfigureAwait(false);
}

Everything seems to be worked correct. Deleting the stream sseems a little bit hard. Is there a way to to update the checkpoints which used internally by the persistent subscription to receive the correct events?