Closed object closed 6 months ago
UPDATE. I managed to broadast message to all routees across the cluster by creating an extra clustered group:
deployment {
/echo {
# router = round-robin-pool
router = broadcast-pool
# nr-of-instances = 4
cluster {
enabled = on
max-total-nr-of-instances = 4
max-nr-of-instances-per-node = 2
allow-local-routees = on
use-role = Upload
}
}
/echo_group {
# router = round-robin-group
router = broadcast-group
routees.paths = ["/user/echo"]
cluster {
enabled = on
max-total-nr-of-instances = 2
max-nr-of-instances-per-node = 1
allow-local-routees = on
use-role = Upload
}
}
let system1 = System.create "cluster-system" (configWithPort 5000)
let system2 = System.create "cluster-system" (configWithPort 5001)
let system3 = System.create "cluster-system" (configWithPort 5002)
let echoActor (mailbox: Actor<_>) (msg: string) =
let nodeAddress = Cluster.Get(mailbox.System).SelfUniqueAddress
match msg with
| "ping" -> logInfof mailbox "Received ping on %A" nodeAddress
| _ -> ()
ignored ()
let pool1 =
spawn
system1
"echo"
{ props (actorOf2 echoActor) with
Router = Some(FromConfig.Instance :> RouterConfig)
}
let pool2 =
spawn
system2
"echo"
{ props (actorOf2 echoActor) with
Router = Some(FromConfig.Instance :> RouterConfig)
}
let pool3 =
spawn
system3
"echo"
{ props (actorOf2 echoActor) with
Router = Some(FromConfig.Instance :> RouterConfig)
}
pool1 <! "ping" // Messages are only sent to the pool1, not all routees
pool2 <! "ping" // Messages are only sent to the pool2, not all routees
let r1: IActorRef<string> = typed <| system1.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "echo_group")
r1 <! "ping" // Message are sent to all clustered pool routees!
let r2: IActorRef<string> = typed <| system2.ActorOf(Props.Empty.WithRouter(FromConfig.Instance), "echo_group")
r2 <! "ping" // Message are sent to all clustered pool routees!
This kind of works, but is this the only way to broadcast messages across the whole cluster pool?
This should just work out of the box. We'll investigate.
🤔 @object have you tried wrapping in a Broadcast
message to see what happens?
Also wondering if, depending on repro specifics, if cluster gossip may be a factor.
I tried to wrap messages in a Broadcast, same result. Message are only sent to local routees.
This seemed to work just fine for me. This is my configuration:
akka
{
actor
{
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment
{
/echo
{
router = broadcast-pool
# nr-of-instances = 6
cluster
{
enabled = on
max-nr-of-instances-per-node = 2
max-total-nr-of-instances = 4
allow-local-routees = on
use-role = Upload
}
}
}
}
remote
{
dot-netty.tcp
{
public-hostname = localhost
hostname = localhost
port = {{port}}
}
}
cluster
{
roles = ["Upload"]
seed-nodes = [ "akka.tcp://cluster-system@localhost:5000" ]
min-nr-of-members = 3
}
}
here's the log output:
[DEBUG][04/24/2024 16:47:39.575Z][Thread 0017][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5001/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c3]
[DEBUG][04/24/2024 16:47:39.576Z][Thread 0017][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5002/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c4]
<SNIP>
[INFO][04/24/2024 16:47:40.568Z][Thread 0024][akka.tcp://cluster-system@localhost:5000/user/echo/c1] Received ping on UniqueAddress: (akka.tcp://cluster-system@localhost:5000, 736123121) from [akka://cluster-system/system/testActor3#1373540069]
[INFO][04/24/2024 16:47:40.568Z][Thread 0017][akka.tcp://cluster-system@localhost:5000/user/echo/c2] Received ping on UniqueAddress: (akka.tcp://cluster-system@localhost:5000, 736123121) from [akka://cluster-system/system/testActor3#1373540069]
[INFO][04/24/2024 16:47:40.568Z][Thread 0026][akka.tcp://cluster-system@localhost:5001/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c3] Received ping on UniqueAddress: (akka.tcp://cluster-system@localhost:5001, 1225519601) from [akka.tcp://cluster-system@localhost:5000/system/testActor3#1373540069]
[INFO][04/24/2024 16:47:40.569Z][Thread 0036][akka.tcp://cluster-system@localhost:5002/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c4] Received ping on UniqueAddress: (akka.tcp://cluster-system@localhost:5002, 1989527182) from [akka.tcp://cluster-system@localhost:5000/system/testActor3#1373540069]
Hmm, let me do more tests.
@Arkatufus can you please share the code you used to spawn routers and test client?
Here is the full unit test class I used. BugFix7167.txt
To match your issue, change "akka.actor.deployment./echo.cluster.max-total-nr-of-instances = 6" back to 4 and delete 2 lines of await ExpectMsgAsync("pong");
@Arkatufus I am trying to nail down what could be wrong with my tests, and I managed to reproduce the failing scenario using code you sent me. I created a gist with 2 tests: one is exact copy of what you sent me, the other one is slightly different:
https://gist.github.com/object/a3fa8db5acc42632a13059df170cf43d
The difference between these two tests is that in your test you create the first cluster node using TestKit Sys. Then everything goes fine and the pool is created of 6 routees in different nodes. The second test doesn't use TestKit Sys and all cluster nodes are created symmetrically. When I compare logging from those two tests there are some things special for the second test.
[SYS-1][DEBUG][04.25.2024 10:18:10.506Z][Thread 0012][LocalActorRefProvider(akka://cluster-system-2)] [akka://cluster-system-2/] Instantiating Remote Actor [akka.tcp://cluster-system-2@localhost:5011/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c3]
[SYS-1][DEBUG][04.25.2024 10:18:10.507Z][Thread 0012][LocalActorRefProvider(akka://cluster-system-2)] [akka://cluster-system-2/] Instantiating Remote Actor [akka.tcp://cluster-system-2@localhost:5012/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c4]
[SYS-1][DEBUG][04.25.2024 10:18:10.507Z][Thread 0012][LocalActorRefProvider(akka://cluster-system-2)] [akka://cluster-system-2/] Instantiating Remote Actor [akka.tcp://cluster-system-2@localhost:5011/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c5]
[SYS-1][DEBUG][04.25.2024 10:18:10.507Z][Thread 0012][LocalActorRefProvider(akka://cluster-system-2)] [akka://cluster-system-2/] Instantiating Remote Actor [akka.tcp://cluster-system-2@localhost:5012/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c6]
[SYS-1][INFO][04.25.2024 10:18:11.513Z][Thread 0032][akka.tcp://cluster-system-2@localhost:5010/user/echo/c1] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5010, 1887880676) from [akka://test/system/testActor1#1160119174]
[SYS-1][INFO][04.25.2024 10:18:11.513Z][Thread 0058][akka.tcp://cluster-system-2@localhost:5010/user/echo/c2] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5010, 1887880676) from [akka://test/system/testActor1#1160119174]
[SYS-2][INFO][04.25.2024 10:18:11.513Z][Thread 0033][akka.tcp://cluster-system-2@localhost:5011/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c3] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5011, 202979286) from [akka.tcp://cluster-system-2@localhost:5010/system/testActor1#1160119174]
[SYS-3][INFO][04.25.2024 10:18:11.513Z][Thread 0041][akka.tcp://cluster-system-2@localhost:5012/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c6] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5012, 251937620) from [akka.tcp://cluster-system-2@localhost:5010/system/testActor1#1160119174]
[SYS-2][INFO][04.25.2024 10:18:11.513Z][Thread 0058][akka.tcp://cluster-system-2@localhost:5011/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c5] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5011, 202979286) from [akka.tcp://cluster-system-2@localhost:5010/system/testActor1#1160119174]
[SYS-3][INFO][04.25.2024 10:18:11.513Z][Thread 0012][akka.tcp://cluster-system-2@localhost:5012/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c4] Received ping on UniqueAddress: (akka.tcp://cluster-system-2@localhost:5012, 251937620) from [akka.tcp://cluster-system-2@localhost:5010/system/testActor1#1160119174]
[SYS-1][DEBUG][04.25.2024 13:09:31.506Z][Thread 0040][LocalActorRefProvider(akka://cluster-system-2)] Resolve of path sequence [/system/testActor2#1645786115] failed
[SYS-1][DEBUG][04.25.2024 13:09:31.506Z][Thread 0042][LocalActorRefProvider(akka://cluster-system-2)] Resolve of path sequence [/system/testActor2#1645786115] failed
[SYS-1][DEBUG][04.25.2024 13:09:31.506Z][Thread 0040][LocalActorRefProvider(akka://cluster-system-2)] Resolve of path sequence [/system/testActor2#1645786115] failed
[SYS-1][DEBUG][04.25.2024 13:09:31.506Z][Thread 0042][LocalActorRefProvider(akka://cluster-system-2)] Resolve of path sequence [/system/testActor2#1645786115] failed
4. And "pong" to TestActor is sent only from the first node. Rest of the messages go to dead-letter queue:
[SYS-2][INFO][04.25.2024 10:18:11.514Z][Thread 0059][akka://cluster-system-2/] Message [String] from [akka://cluster-system-2/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c5#193301339] to [akka://cluster-system-2/] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://cluster-system-2/] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: pong [SYS-3][INFO][04.25.2024 10:18:11.514Z][Thread 0012][akka://cluster-system-2/] Message [String] from [akka://cluster-system-2/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c6#1324170935] to [akka://cluster-system-2/] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://cluster-system-2/] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: pong [SYS-2][INFO][04.25.2024 10:18:11.514Z][Thread 0059][akka://cluster-system-2/] Message [String] from [akka://cluster-system-2/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c3#1045538363] to [akka://cluster-system-2/] was not delivered. [2] dead letters encountered. If this is not an expected behavior then [akka://cluster-system-2/] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: pong [SYS-3][INFO][04.25.2024 10:18:11.514Z][Thread 0012][akka://cluster-system-2/] Message [String] from [akka://cluster-system-2/remote/akka.tcp/cluster-system-2@localhost:5010/user/echo/c4#1586511460] to [akka://cluster-system-2/] was not delivered. [2] dead letters encountered. If this is not an expected behavior
What could be different comparing to use of TestKit Sys vs. creating actor systems explicitly?
Thanks you.
Ran some more tests, found other suspicious log entries.
I see that remote actors are correctly created in the cluster pool:
[DEBUG][04.25.2024 13:02:30.170Z][Thread 0055][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5001/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c3]
[DEBUG][04.25.2024 13:02:30.171Z][Thread 0055][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5002/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c4]
[DEBUG][04.25.2024 13:02:30.171Z][Thread 0055][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5001/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c5]
[DEBUG][04.25.2024 13:02:30.171Z][Thread 0055][LocalActorRefProvider(akka://cluster-system)] [akka://cluster-system/] Instantiating Remote Actor [akka.tcp://cluster-system@localhost:5002/remote/akka.tcp/cluster-system@localhost:5000/user/echo/c6]
But then this happens:
[ERROR][04.25.2024 13:02:30.635Z][Thread 0057][akka.tcp://cluster-system@localhost:5000/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fcluster-system%40localhost%3A5002-2/endpointWriter] AssociationError [akka.tcp://cluster-system@localhost:5000] <- akka.tcp://cluster-system@localhost:5002: Error [Failed to write message [Akka.Remote.DaemonMsgCreate] to the transport] []
[WARNING][04.25.2024 13:02:30.637Z][Thread 0057][akka.tcp://cluster-system@localhost:5000/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fcluster-system%40localhost%3A5002-2] Association with remote system akka.tcp://cluster-system@localhost:5002 has failed; address is now gated for 5000 ms. Reason is: [Akka.Remote.EndpointException: Failed to write message [Akka.Remote.DaemonMsgCreate] to the transport
---> Hyperion.ValueSerializers.UnsupportedTypeException: Ambiguous match found for 'System.Collections.Concurrent.BlockingCollection`1[System.Threading.Tasks.Task] Void Add(System.Threading.Tasks.Task)'.
at Hyperion.ValueSerializers.UnsupportedTypeSerializer.WriteManifest(Stream stream, SerializerSession session)
at lambda_method391(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method393(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method383(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)
at Hyperion.SerializerFactories.ArraySerializerFactory.WriteValues(Array array, Stream stream, Type elementType, ValueSerializer elementSerializer, SerializerSession session)
at Hyperion.SerializerFactories.ArraySerializerFactory.<>c__DisplayClass4_0.<BuildSerializer>b__1(Stream stream, Object arr, SerializerSession session)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method333(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method276(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method321(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)
at Hyperion.SerializerFactories.ArraySerializerFactory.WriteValues(Array array, Stream stream, Type elementType, ValueSerializer elementSerializer, SerializerSession session)
at Hyperion.SerializerFactories.ArraySerializerFactory.<>c__DisplayClass4_0.<BuildSerializer>b__1(Stream stream, Object arr, SerializerSession session)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method319(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at Hyperion.Extensions.StreamEx.WriteObjectWithManifest(Stream stream, Object value, SerializerSession session)
at Hyperion.SerializerFactories.DelegateSerializerFactory.<>c__DisplayClass2_0.<BuildSerializer>b__1(Stream stream, Object value, SerializerSession session)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method276(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method276(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method238(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at Hyperion.Extensions.StreamEx.WriteObject(Stream stream, Object value, Type valueType, ValueSerializer valueSerializer, Boolean preserveObjectReferences, SerializerSession session)
at lambda_method232(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method171(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method167(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method153(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method138(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method124(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method126(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method130(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at lambda_method90(Closure, Stream, Object, SerializerSession)
at Hyperion.ValueSerializers.ObjectSerializer.WriteValue(Stream stream, Object value, SerializerSession session)
at Hyperion.Serializer.Serialize(Object obj, Stream stream, SerializerSession session)
at Hyperion.Serializer.Serialize(Object obj, Stream stream)
at Akka.Serialization.HyperionSerializer.ToBinary(Object obj)
at Akka.Remote.Serialization.DaemonMsgCreateSerializer.Serialize(Object obj)
at Akka.Remote.Serialization.DaemonMsgCreateSerializer.PropsToProto(Props props)
at Akka.Remote.Serialization.DaemonMsgCreateSerializer.ToBinary(Object obj)
at Akka.Remote.MessageSerializer.Serialize(ExtendedActorSystem system, Address address, Object message)
at Akka.Remote.EndpointWriter.SerializeMessage(Object msg)
at Akka.Remote.EndpointWriter.WriteSend(Send send)
--- End of inner exception stack trace ---
at Akka.Remote.EndpointWriter.PublishAndThrow(Exception reason, LogLevel level, Boolean needToThrow)
at Akka.Remote.EndpointWriter.WriteSend(Send send)
at Akka.Remote.EndpointWriter.<Writing>b__27_0(Send s)
at lambda_method79(Closure, Object, Action`1, Action`1, Action`1)
at Akka.Tools.MatchHandler.PartialHandlerArgumentsCapture`4.Handle(T value)
at Akka.Actor.ReceiveActor.ExecutePartialMessageHandler(Object message, PartialAction`1 partialAction)
at Akka.Actor.ReceiveActor.OnReceive(Object message)
at Akka.Actor.UntypedActor.Receive(Object message)
at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
at Akka.Actor.ActorCell.ReceiveMessage(Object message)
at Akka.Actor.ActorCell.Invoke(Envelope envelope)]
And after that only 2 of 6 routees (local ones) get pings.
@object
For your ClusterBroadcastPool2
test class, you're actually creating 4 ActorSystem
s, the TestKit
internal ActorSystem
, _system
, _system2
, and _system3
. The internal ActorSystem
name doesn't match the other 3 and it is not connected to the cluster.
When you declare public ClusterBroadcastPool2(ITestOutputHelper output) : base(SystemName, output)
, the first parameter is actually a string HOCON config, not the system name; if you need to specify the TestKit
ActorSystem
name, you need to use the base(string, string, ITestOutputHelper)
constructor.
So the problem there is that you're passing an IActorRef
reference into the remote pool actor Props
that the cluster doesn't know how to connect to.
In order for the ClusterBroadcastPool2
test class to work, you'd have to change it to this:
public class ClusterBroadcastPool2
{
private const string SystemName = "cluster-system-2";
private const int SeedPort = 5010;
private readonly ITestOutputHelper _out;
private readonly Cluster _cluster;
private readonly ActorSystem _system1;
private readonly ActorSystem _system2;
private readonly ActorSystem _system3;
private readonly TestKit.Xunit2.TestKit _testKit1;
private readonly TestKit.Xunit2.TestKit _testKit2;
private readonly TestKit.Xunit2.TestKit _testKit3;
public ClusterBroadcastPool2(ITestOutputHelper output)
{
_out = output;
_system1 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort));
_system2 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 1));
_system3 = ActorSystem.Create(SystemName, PoolConfig.Get(SystemName, SeedPort, SeedPort + 2));
_testKit1 = new TestKit.Xunit2.TestKit(_system1, _out);
_testKit2 = new TestKit.Xunit2.TestKit(_system2, _out);
_testKit3 = new TestKit.Xunit2.TestKit(_system3, _out);
_cluster = Cluster.Get(_system1);
}
[Fact]
public async Task BroadcastPoolTest_ClusterFromTestKit()
{
try
{
var tcs = new TaskCompletionSource<Done>();
_cluster.RegisterOnMemberUp(() => { tcs.SetResult(Done.Instance); });
await tcs.Task;
var propsWithRouter = Props.Create(() => new EchoActor(_testKit1.TestActor)).WithRouter(FromConfig.Instance);
var pool = _system1.ActorOf(propsWithRouter, "echo");
// wait until cluster pool stabilizes
await Task.Delay(1000);
pool.Tell("ping");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectMsgAsync("pong");
await _testKit1.ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}
finally
{
await Task.WhenAll(
_system1.Terminate(),
_system2.Terminate(),
_system3.Terminate());
}
}
}
_testKit2
and _testKit3
is there just to make sure that their logging output is routed to the xunit logger because the InitializeLogger()
method is a protected method. You can ommit them if you don't care about their log outputs.
Of course, my bad. Then it works like a charm. Thanks a lot!
Scenario
We have a clustered pool router and want messages to be broadcasted to all routees in the cluster.
Problem
Messages are only sent to the local router.
Code example
This example is in F# script but should be easy to understand
We configured a pool of 4 instances with max 2 instances per node. So pool1 and pool2 get 2 instances but pool 3 doesn't get any. Now how do I make a message to reach all routees in the cluster? Sending a message to pool1 broadcasts them only to 2 routees belonging to the pool1, indeed. Is it possible to send messages across the whole cluster?