Open AqlaSolutions opened 7 months ago
It looks like TopicActor starts at another non-publisher node so it has to deal with serialization. How can we force it to start at the local node for specific topics?
Hi, currently this is only one type of topic-actor, and that is cluster-wide so to say. it can run on any cluster node. This is why you are seeing this behavior.
That being said. Pub-Sub is still experimental. we could def expand the API to allow for more customization here. e.g. currently the topic-actor kind is autoregistered for all cluster nodes. We could make it so that you can register various topic types, with different configurations. and for different nodes that is.
That change would be pretty easy to solve. it´s mostly about allowing to have a non hardcoded kind for the topic actor.
Up for suggestions here
Under the covers, the TopicActor is registered like _clusterKinds.Add(TopicActor.Kind, new ClusterKind(TopicActor.Kind, Props.FromProducer(() => new TopicActor(store))).Build(this));
, so to work around the serialization issue, I simply register additional TopicActors under different kinds. This way I can choose where a certain kind of TopicActor can start, and send messages to that topic actor that it's able to deserialize. Then I ended up making some extensions to support specifying a custom topicActorKind
so I can target the right one.
public static class SubscriptionExtensions
{
public static async Task<IAsyncDisposable> SubscribeAsync(this Cluster cluster, string topicActorKind, string topic, Receive receive)
{
var subId = cluster.System.Root.Spawn(Props.FromFunc(receive));
await cluster.RequestAsync<SubscribeResponse>(topic, topicActorKind, new SubscribeRequest
{
Subscriber = new SubscriberIdentity
{
Pid = subId
}
}, default);
return new Subscription(cluster, topicActorKind, topic, subId);
}
public class Subscription(Cluster cluster, string topicActorKind, string topic, PID subId) : IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
await cluster.RequestAsync<UnsubscribeResponse>(topic, topicActorKind, new UnsubscribeRequest
{
Subscriber = new SubscriberIdentity
{
Pid = subId
}
}, default);
}
}
public static BatchingProducer CreatePublisher(this IContext context, string topicActorKind, string topic, ILogger logger)
{
return new BatchingProducer(new TopicActorKindPublisher(context.Cluster(), topicActorKind), topic, new BatchingProducerConfig
{
OnPublishingError = async (retries, exception, batch) =>
{
if (retries > 6)
{
logger.LogError(exception, "Failed to publish batch, giving up.");
return PublishingErrorDecision.FailBatchAndContinue;
}
logger.LogWarning(exception, "Failed to publish batch, will retry.");
return PublishingErrorDecision.RetryBatchImmediately;
}
});
}
}
Would be awesome to build this in at some point, but there is another solution to the serialization problem (although having multiple topic actor kinds is still good for the isolation) - which is to not serialize at all. In theory, the TopicActor and the PubSubMemberDeliveryActor should not actually have to deserialize the contents being published at all! I'm not sure the best way to go about this, since the messages that facilitate publishing need to be deserialized of course, just not the actual payload. Would definitely be a performance bump and make it so the publisher and subscriber are the only ones that need to know how to serialize the contents.
We have a few nodes and some of them were using
RunAsClient
but now I disabled it and things went wrong. One of nodes (which originally didn't useRunAsClient
) leverages pub-sub for internal communication between its actors. Somehow those internal messages arrive to another node and cause a deserialization exception (because another node doesn't know anything about those internal types). This happens even though that node doesn't subscribe to the topic where this message is published (but subscribes to another). It didn't happen before theRunAsClient
change.Is this a bug or expected behavior?