asynkron / protoactor-dotnet

Proto Actor - Ultra fast distributed actors for Go, C# and Java/Kotlin
http://proto.actor
Apache License 2.0
1.67k stars 284 forks source link

Cluster pub-sub resubscription and event sourcing streaming #2112

Open AqlaSolutions opened 1 month ago

AqlaSolutions commented 1 month ago

I have 2 nodes, one subscribes and another publishes. If the publisher node restarts, will the first node receive the published messages after that? What are guarantees on this? Is it possible that some published messages from the new node will be skipped before resubscription?

How should we implement a snapshot+events subscription considering all above? We request a snapshot at the start of subscription but it's possible that after the restart we will need a new snapshot so we have to somehow identify that events are coming from a new node, right?

AqlaSolutions commented 1 month ago

I discovered that the subscription doesn't survive the publisher restart when I set RunAsClient = true at the subscriber side. Setting it to false fixes the resubscription. Is it expected behavior?

AqlaSolutions commented 1 month ago

I researched the source code, it looks like subscribers don't keep any information on what topic they are subscribed to and we have to add KV storage at publisher side. What is the reason for this design? Wouldn't it be easier for subscribers to keep their state themselves? Then TopicActor at start would need to broadcast that the topic is available and DeliveryActors would send resubscribe requests. Or they could just send currently subscribed topics to each newly connected node when a new connection is established. This way there is no need keep the state externally.

rogeralsing commented 1 month ago

See: https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L74

https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L61-L64

This is how you can have persistent subscriptions for the topic actor.

rogeralsing commented 1 month ago

But in addition to this. I´d really like to have the topic types separated. so that you can have more configuration per topic.

e.g.

something like:

clusterConfig.WitTopicKind("ChatRoom", Props.FromProducer(() => new TopicActor(kvStore)))

Then when you publish:

await _publisher.Publish("myChatRoom123", "ChatRoom", new ChatMessage ....)

That would allow you to keep those specific topics bound to specific nodes, and not as it is today, all nodes holds topic-actors.

rogeralsing commented 1 month ago

How should we implement a snapshot+events subscription considering all above? We request a snapshot at the start of subscription but it's possible that after the restart we will need a new snapshot so we have to somehow identify that events are coming from a new node, right?

First, I assume you are using virtual actors here? as they are location transparent within the cluster.

I´m thinking that no matter if the subscribing virtual actor is awake or not right now. If you publish to the pubsub, and that virtual actor is already subscribed. Then the message will either go directly to the already awake actor, or the actor will be woken up and then get the message.

In the actor OnStart handler, you could just load your snapshot and any persisted events after that specific snapshot.

Why would you have to identify from where the events are coming in? if they go to the topic actor, and the topic actor forwards to the subscribed actor. it will get it.

AqlaSolutions commented 1 month ago

You misunderstood what I mean by event sourcing here. Our subscriber actor doesn't store anything, it requests the snapshot of the current state from the publisher grain and then keeps this state up-to-date by receiving incremental Pub-Sub events and applying them onto the snapshot. If event publisher restarts, we need to detect that Pub-Sub events are now coming from another grain instance and re-request the snapshot. What I asked is whether we really need this re-requesting part or can we rely on that there will be no skipped events at the subscriber part when the publisher node restarts.

AqlaSolutions commented 1 month ago

@rogeralsing

See: https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L74

https://github.com/asynkron/realtimemap-dotnet/blob/3c2609e26777007805281725a7696ed8338d48a0/Backend/ProtoActorExtensions.cs#L61-L64

This is how you can have persistent subscriptions for the topic actor.

I implemented IKeyValueStore but TopicActor removes subscribers from the list after loading because they are using RunAsClient = true so that they are not present in the Consul member list. I can't set it to false for now because they have no access to the KeyValueStore so TopicActor should not spawn on them. Is there any solution for this?

AqlaSolutions commented 1 month ago

@rogeralsing , another problem here is that when I subscribe to a topic from an unnamed actor, it receives a numeric PID like $1. This PID gets stored into the key value storage. It's possible that when subscribers are loaded in TopicActor, the loaded PIDs may point now to another actor which didn't subscribe to the topic. Of course it can only happen if the subscriber also restarts during TopicActor downtime. Is there a mechanism to prevent this? May be actors should use random guid names instead of incremental numbers? Or better add unique node GUID to PID instead so that restarted node on the same host will be treated as a different node? Because now even named actors may receive PubSub events after their node restart if the publisher was down during such restart, I don't think that it's intended.