Closed manveldavid closed 8 months ago
WampSharp does not automatically resubscribe to topics upon reconnection. If you want that behavior, you should use a WampChannelReconnector and subscribe in the reconnector event handler.
As for an OnError handler for reflection based subscriber, it does not exist because there are is no semantic for this in WAMP.
Elad
I am using WampChannelReconnector and subscribing to tops in the reconnector event, however after resubscribing all rpc methods work but subscriptions stop working Here is the reconnector code:
public static async Task ConnectTo(int num)
{
var channel = new DefaultWampChannelFactory().CreateJsonChannel(serverAddress, realmName + num.ToString());
channels.Add(channel);
Func<Task> connect = async () =>
{
await channel.Open();
Console.WriteLine($"Connected to {serverAddress} with realm name: {realmName}");
var realm = channel.RealmProxy;
channel.RealmProxy.Monitor.ConnectionBroken += (sender, args) => { Console.WriteLine(args.Details); };
rpcServices.Add(realm.Services.GetCalleeProxy<IRpcService>());
realm.Services.GetSubject<string>("topic").Subscribe((s) => Console.WriteLine(s), onError: ex => Console.WriteLine(ex.Message));
realm.Services.GetSubject<string>("topicDisconnect").Subscribe((s) => Console.WriteLine("Host disconnect"));
realm.Services.GetSubject<string>("topicConnect").Subscribe(async (s) => Console.WriteLine("Host connect"));
};
WampChannelReconnector reconnector = new WampChannelReconnector(channel, connect);
reconnector.Start();
}
Full console apps:
Service:
static async Task Main(string[] args)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Service\n\n");
Console.WriteLine("Realm Num:");
if(int.TryParse(Console.ReadLine(), out var num))
{
var uri = $"ws://127.0.0.1:{8080 + num}/ws";
var host = new DefaultWampHost(uri);
var realmName = "service";
var hostRealm = host.RealmContainer.GetRealmByName(realmName);
host.Open();
await hostRealm.Services.RegisterCallee(new RpcService());
Console.WriteLine($"Host started on {uri} with realm name: {realmName}");
var topic = hostRealm.Services.GetSubject<string>("topic");
while (true)
{
await Task.Delay(1000);
topic.OnNext(realmName);
}
}
Console.ReadLine();
}
Router:
public static string RouterAddress = "ws://127.0.0.1:8080/ws";
public static string realmName = "router"; static async Task Main(string[] args) { Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine("Router\n\n"); var serverAddress = RouterAddress; var host = new DefaultWampHost(serverAddress); var chanelFactory = new DefaultWampChannelFactory(); var hostRealm = host.RealmContainer.GetRealmByName(realmName); host.Open();
while (true)
{
Console.WriteLine($"Realm Num:");
try
{
await Task.Yield();
if (int.TryParse(Console.ReadLine(), out var num))
{
ConnectTo(host, num);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
public static async Task ConnectTo(DefaultWampHost host, int num)
{
try
{
var uri = $"ws://127.0.0.1:{8080 + num}/ws";
var newRealmName = realmName + num.ToString();
var serviceRealmName = "service";
var chanel = new DefaultWampChannelFactory()
.CreateJsonChannel(uri, serviceRealmName);
Func
- Client:
``` c#
public static string serverAddress = Router.Program.RouterAddress;
public static string realmName = "router";
public static List<IRpcService> rpcServices = new();
public static List<IWampChannel> channels = new();
static async Task Main(string[] args)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("Client\n\n");
var message = "";
Console.WriteLine("Realm Num:");
while (true)
{
await Task.Yield();
if (int.TryParse(Console.ReadLine(), out int num))
{
ConnectTo(num);
while (true)
{
await Task.Yield();
message = Console.ReadLine() ?? string.Empty;
if (!string.IsNullOrEmpty(message))
{
foreach (var service in rpcServices)
{
try
{
service?.PrintMessage(message);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
else
{
break;
}
}
}
}
}
public static async Task ConnectTo(int num)
{
var channel = new DefaultWampChannelFactory().CreateJsonChannel(serverAddress, realmName + num.ToString());
channels.Add(channel);
Func<Task> connect = async () =>
{
await channel.Open();
Console.WriteLine($"Connected to {serverAddress} with realm name: {realmName}");
var realm = channel.RealmProxy;
channel.RealmProxy.Monitor.ConnectionBroken += (sender, args) => { Console.WriteLine(args.Details); };
rpcServices.Add(realm.Services.GetCalleeProxy<IRpcService>());
realm.Services.GetSubject<string>("topic").Subscribe((s) => Console.WriteLine(s), onError: ex => Console.WriteLine(ex.Message));
realm.Services.GetSubject<string>("topicDisconnect").Subscribe((s) => Console.WriteLine("Host disconnect"));
realm.Services.GetSubject<string>("topicConnect").Subscribe(async (s) => Console.WriteLine("Host connect"));
};
WampChannelReconnector reconnector = new WampChannelReconnector(channel, connect);
reconnector.Start();
}
Even the reconnector doesn’t help, subscriptions don’t work after reconnecting the Service
I debugged your code and the problem is that your topics are raising errors that you are not catching, because you did not supply an OnError handler to all of them. Replacing your lines with these lines makes the code work:
realm.Services.GetSubject<string>("topic").Subscribe((s) => Console.WriteLine(s), onError: ex => Console.WriteLine(ex.Message));
realm.Services.GetSubject<string>("topicDisconnect").Subscribe((s) => Console.WriteLine("Host disconnect"), onError: ex => Console.WriteLine(ex.Message));
realm.Services.GetSubject<string>("topicConnect").Subscribe(async (s) => Console.WriteLine("Host connect"), onError: ex => Console.WriteLine(ex.Message));
Your code does not compile as is and I had to modify it to make it work. That takes from my time and does not allow me to help you as efficiently as I could. There is a reason for the bug report template and I ask you to respect it and follow it.
Your code example also suggests that you did not understand how WAMP is structured and what the difference between a router and a peer is when you asked your question. I suggest you read the documents on WampSharp.net and also on wamp-proto to get a better understanding.
Finally, this project has been around for more than 10 years and is used in production by many users. I would appreciate if you would treat this project as such.
Thanks a lot! I apologize for wasting your time But replacing the lines did not solve the problem Why then if I made the wrong wamp design, rpc works as I intended, but there is no subscription, where can I find examples of good wamp designs?
Here is an example based on the getting started examples on WampSharp.net. I modified the examples to use the reconnector. Note that the Router does not contain any logic code (this is the recommendation). To test it, run all program and then shut down the router. Then launch the router again and you will see that the publisher and the subscriber resume communication.
I understood my problem.
Yes, I don’t have a router, I have a node, and the subscriptions didn’t work because the "Routers" reconnector had the line await host.RealmContainer.GetRealmByName(newRealmName).Services.RegisterCallee(rpcService);
.
It didn’t throw exceptions in the reconnector's context but it broke the logic and execution of the program silently.
I just registered the callee once before initializing the reconnector and everything worked.
Thank you so much again!
I have this line:
realm.Services.GetSubject<string>("topic").Subscribe((s) => Console.WriteLine(s), onError: ex => Console.WriteLine(ex.Message));
It works, but after reconnecting to the host, subscriptions stop working. How can I rewrite this line in Reflection-based style withonError
handling? I want to try usingrealm.Services.RegisterSubscriber(mySub);
after reconnecting, I think it will work.