dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.14k stars 2.04k forks source link

Missing messages, not delivered, MemoryStream #8399

Open jamescarter-le opened 1 year ago

jamescarter-le commented 1 year ago

I am using MemoryStream, and whilst I know it is not a Guaranteed Delivery mechanism, I have a low frequency of messages (maybe 1 per 5 minutes currently) and would not expect to loose a lot (any) of messages when operating inside a VPC, over TCP.

This is an explicit subscription.

I notice that I can sometimes get this Error log, it is usually one stream that seems to die, but it is not reliable, sometimes it does come back. I get no exceptions calling OnNextBatch on the producer. The grain still responds to messages from other subscribed streams with the same namespace, the Grain is not Blocked.

Message: An error occurred on the stream

Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638174475994405873, EventIndex=0], Low: [EventSequenceToken: SeqNum=638174475994405875, EventIndex=0], High: [EventSequenceToken: SeqNum=638174475994405875, EventIndex=0]
   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
   ........
   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 305

The grain subscribed is a long running grain, (it has it's own scoped Websocket to a client API), so it resubscribed on cluster start, or if a silo is restarted due to rolling upgrade.

Any idea's where to start debugging this? The message is a simple POCO struct, and contains strings and value types, with 1 enum. Decorated with GenerateSerializer and the Id(n) attributes.

OnErrorAsync on the IAsyncObserver is never invoked, and I'm struggling to think what could be causing this. I am logging the messages inside the producer, so I know the exist, and I am logging the /fact/ a message was received on the consuming grain as a string, rather than try and access any properties just in case, but for some streams nothing appears.

I log the fact the Grain calls ResumeAsync in it's activation.

jamescarter-le commented 1 year ago

It looks like I was mistaken about OnErrorAsync not being invoked, it is indeed where I am catching the above exception and logging it.

I found this ticket: https://github.com/dotnet/orleans/issues/850 which appears to go over this, which seems to mention it has been resolved, but I'm not sure how to handle this in my own scenario, without loosing messages.

I am not providing a SequenceToken on resume (as I am using a MemoryStream anyway).

Unsure if this is a bug in the MemoryStream, or if it is state that has gone bad somehow in my Grains? The SeqNum mentioned above 638174475994405873 it excessively large if this is a sequential id of the last message on the Stream, this number should be in the region of 2000 or so if it starts from Zero on a new stream.

jamescarter-le commented 1 year ago

On the Producer the Stream I am publishing too is cached once it is first referenced. Maybe this is an issue when I perform a rolling upgrade and the Queue moves Silo?

if (_stream is null)
{
    var streamId = StreamId.Create(StreamNamespace, _state.State.Identifier);
    _stream = this.GetStreamProvider(StreamProviderName).GetStream<MyEvent>(streamId);
}

Edit: Removed this cached Stream and error still occurs when Silo is rolling-upgraded.

Also, on the Consumer I have implemented IAsyncObserver<MyEvent>, is this approach appropriate when I am subscribed to multiple Stream Ids rather than providing delegates to ResumeAsync? Lastly, should I be calling ResumeAsync when IAsyncObserver.OnErrorAsync is invoked to avoid loosing this event?

jamescarter-le commented 1 year ago

Does anyone have any information on this issue? What should I be doing with my Consumers if this error occurs? Whenever I do a Cluster upgrade, it is likely this error will occur and I lose messages on streams:

image

In this example, I lost all messages on these failing streams for about an hour.

jamescarter-le commented 1 year ago

I believe the silo is being shutdown correctly, as I see my Consumers logging they are being deactivated before the silo count reduces in the Dashboard. But I get these messages coming up, just wondering if something is wrong with cluster health and that could be why this issue is happening?

[Warning] Orleans.Grain: Failed to register grain [Activation: S10.0.2.16:11111:41886690/<GRAINID>#Placement=RandomPlacement State=Invalid] in grain directory
[Information] Orleans.Messaging: Trying to forward Request [S10.0.2.16:11111:41886690 sys.client/44f96651316a4235b196861b937ad370]->[S10.0.2.16:11111:41886690 <GRAINID>] GrainType.SubmitPerpData() #2030598 from [GrainAddress GrainId <GRAINID>, ActivationId: @d268d5afb2c2412386451534556041a3, SiloAddress: S10.0.2.16:11111:41886690] to [GrainAddress GrainId <GRAINID>, ActivationId: @22ed7f20597d4fb6bf9d45839e9c3381, SiloAddress: S10.0.3.210:11111:41886870] after Failed to register activation in grain directory.. Attempt 0

I get many hundreds of these errors, my understanding was I should be able to replace the silos without the clients/cluster having these kind of issues?

Maybe it's something I'm doing wrong in my configuration? I am adding UseDynamoDBClustering on both Silo and Client hosts.

I have just done a rolling upgrade to see if one of my Streams comes back, one of my two Consumers (I only have 2 for this testing) was no longer receiving OnNextAsync from it's explicit stream. It was not blocked. Subscribing to this same Stream from a second Consumer, (without restarting the cluster) was able to receive these messages. What is interesting, is that I have an API call to DeactivateOnIdle a Consumer, I did this to the non-responsive (to Stream) Consumer, and so it would have done ResumeAsync but it still did not respond to OnNextAsync.

This is why I just did a rolling upgrade. Can you confirm this approach is acceptable?

Before Refresh Silos: before_update_cluster

During Refresh Silos: 6_silos_total

After Refresh Silos: fully_replaced

Is there some way I should be handing Stream OnErrorAsync that I'm just not seeing? I have done a lot of work with Streams with very early versions of Orleans, and did not see these kinds of issues then: https://github.com/jamescarter-le/OrleansEventStoreProvider A pull request of mine into Orleans: https://github.com/dotnet/orleans/pull/2459

I'm not sure how long I should be expecting these kinds of messages for? As the silo never actually had any downtime, but I get these going for almost 45 mins now. When the client wants to send a message to the cluster (about 1/sec across ~10 grains), it uses IGrainFactory each time, and does not store it's reference to the grain.

image

jamescarter-le commented 1 year ago

OK after the Silo restarted, I was able to repo it straight away. My Consumers did ResumeAsync, my Producers started to emit messages, and nothing turns up on the Consumers.

I'm not passing a SequenceToken on the Producers, as it is a memory stream and Optional on the interface, could this be the issue? Because the silo is not actually shutdown, and the streams live on, could the sequence be totally out? And why I got ~200k QueueCacheMissExceptions originally?

--

Looks like SequenceToken is not passed down anyway.

I had to completely shutdown the cluster (bring all active silos down to zero) and reboot it again, will see if this starts up messages on the bad streams+consumer again.

jamescarter-le commented 1 year ago

Just done another rolling upgrade as part of development, this time it generated ~560K QueueCacheMissException in 30 minutes.

I do not know if the streams are still alive or not.

Any thoughts from the team would be greatly appreciate.

image

The Silos processes are not just terminated, they are given time to terminate and output the correct Silo stopped logs:

image

** It's as if when the silo is restarted, the streams which were interrupted are trying to replay their way back up the memory stream from when the first message on the stream was published.

This takes so long (and generates so many QueueCacheMissException) that any real message are lost during this time.

Could this be what is happening here? Would this be subscriptions on the Client, subscriptions in the Silo or both? **

Okay - QueueCacheMissExceptions have finally stopped after 30 mins, but many streams are broken and are not delivering messages to clients.

There are no error messages about this, there is a single warning from one stream, but none from another stream which when I send data into the Producer grain, nothing is output to the Consumer. I can see the Producer logging the data received.

This occurred shortly after the Silos were upgraded:
[Warning] Orleans.Streams.PubSubRendezvousGrain: Producer PubSubPublisherState:StreamId=<StreamNamespace&Type>/538E78E33A3B0363FC37E393EB334103,Producer=sys.svc.stream.agent/10.0.3.250:11111@42038429+EventType_5_eventtype-1-0x20000001. on stream <StreamNamespace&Type>/538E78E33A3B0363FC37E393EB334103 is no longer active - permanently removing producer.

The MemoryStream grains are still up and running, 8 as is the default.

image

When I completely reset the cluster (kill all Silos, then bring them back up) the system works perfectly, I last ran it about 5 days with no issues or breakdown in streams. It only occurs when the silos are rolling upgrade (I suppose it would also fail if I loose an instance in AWS for example)

jamescarter-le commented 1 year ago

Is there a specific Type and LogLevel I should enable to help me debug this? Looks like PersistentStreamPullingAgent is not emitting no errors or warnings.

Anything else I should be looking for? To me it feels like something is Blocked or stuck in some unrecoverable state but there is nothing output in the logs to say what.

benjaminpetit commented 1 year ago

Sorry I haven't had the time to look at your issue yet.

Are you 100% sure that your silos are shutting down gracefully on rolling upgrade? Do you see warning/errors about silos failing to respond to probe messages?

jamescarter-le commented 1 year ago

Thanks @benjaminpetit I'm just trying to gather as much information as I can.

I think maybe I have a rough location, indicated by the fact that the application emits these errors for ~30 mins (which is the same as the default Queue purge time?).

DefaultDataMaxAgeInCache = TimeSpan.FromMinutes(30);

https://github.com/dotnet/orleans/blob/01475f531c27c1c1fa7bc6f20827ffc8a50e5636/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs#L201

Here is the data from the actual QueueCacheMissExceptions:

Req:  [EventSequenceToken: SeqNum=638186329955314832, EventIndex=0],
Low:  [EventSequenceToken: SeqNum=638186308533223378, EventIndex=0],
High: [EventSequenceToken: SeqNum=638186329955314832, EventIndex=0]

It looks like the Requested message is greater than what it believes is the Oldest Message, but the lastPurgedToken has not yet been set? In this case it throws QueueCacheMissException, doesn't change the Cursor, and it will just continue this loop?

I do not get any errors from silos about probe messages, and am logging the Silo shutdown messages in my logs.

image

jamescarter-le commented 1 year ago

I tried another mechanism for rolling update of bringing down 1 silo, then replacing it when it was shutdown. Instead of creating 3 new silos, and dropping the older 3 when the new 3 joined, as I thought if ALL the old silos shutdown at the same time it could create this issue.

No joy, after it had finished recycling all silos. The streams ended up unresponsive again.

Infact, it generated so many QueueCacheMissException at a rate that my AWS Logger buffer ran out of space.

benjaminpetit commented 1 year ago

When using MemoryStream, the SequenceNumber starts from the DateTime.UtcNow.Ticks value when the MemoryStreamQueueGrain was first activated, so it's expected that's not 0 or a low number. SequenceNumber is then incremented for each message enqueued.

It is somewhat expected to see QueueCacheMissException when doing a rolling upgrade when using MemoryStream: the stream queue (with this provider only) are stored in grains, that are deactivated from silos shutting down.

When this grain queue is reactivated somewhere else, the SequenceNumber starts again from DateTime.UtcNow.Ticks.

LastPurgedToken is not helping in your case either, because it's also a silo local value, that is lost when a silo shuts down.

I do not get any errors from silos about probe messages, and am logging the Silo shutdown messages in my logs.

But the log line

[Warning] Orleans.Streams.PubSubRendezvousGrain: Producer PubSubPublisherState:StreamId=<StreamNamespace&Type>/538E78E33A3B0363FC37E393EB334103,Producer=sys.svc.stream.agent/10.0.3.250:11111@42038429+EventType_5_eventtype-1-0x20000001. on stream <StreamNamespace&Type>/538E78E33A3B0363FC37E393EB334103 is no longer active - permanently removing producer.

really looks like a silo did not had enough time to shut down properly.

Do you see the new silos starting new pulling agents?

If you could send me the full silos log it could be very helpful.

jamescarter-le commented 1 year ago

@benjaminpetit

Do you have a secure way for me to send the log files to you directly?

image

image

luckyycode commented 1 year ago

Anything new about this issue?