Closed ryanbuening closed 4 years ago
Hi! I personally haven't used SqlServer transport before, but here's what I found.
You can change subscriptions to be stored in SqlServer with isCentralized option. In that case the subscription is added immediately and not through sending a SubscribeRequest.
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR().AddRebusBackplane<Hub>();
services.AddRebus(configure => configure
.Transport(x =>
{
x.UseSqlServer(Configuration["SignalRBackplaneConnectionString"], GenerateTransientQueueName("Rebus.SignalR"));
})
.Options(o => o.EnableSynchronousRequestReply())
.Routing(r => r.TypeBased())
.Subscriptions(s => s.StoreInSqlServer(Configuration["SignalRBackplaneConnectionString"], "Subscriptions", isCentralized: true)));
}
Keep in mind, that current SqlServer transport implementation in Rebus as of now doesn't have an option to automatically delete queue tables, you should implement it yourself. It's rather simple - just drop the queue table when Bus is disposed using BusLifetimeEvents (see https://github.com/rebus-org/Rebus/blob/060f0cd51affe4844d1167308971fe4cf8a294eb/Rebus.Tests/Events/TestBusLifetimeEvents.cs).
I'm not sure why SubscribeRequest isn't processed by internal handlers automatically when isCentralized: false for SqlServer subscription storage. Maybe @mookid8000 could elaborate on this question.
My guess is that routing information is missing in that case.
For Rebus to be able to subscribe to a message in decentralized mode, an owner endpoint mapping must have been configured for the relevant event type:
Configure.With(...)
.(...)
.Routing(t => t.TypeBased().Map<SomeEvent>("some-queue"))
.Start();
which will then cause Rebus to send its SubscribeRequest
for SomeEvent
to some-queue
.
Missing route information would cause Rebus to throw an exception like "dont know where to send subscribe request for SomeEvent, because I don't know who owns that event type", so I guess something somewhere is swallowing that exception...?
I implemented an additional extension method MapSignalRCommands to support decentralized subscription storages:
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR()
.AddRebusBackplane<ChatHub>();
var queueName = GenerateTransientQueueName("Rebus.SignalR");
services.AddRebus(configure => configure
.Transport(x =>x.UseSqlServer(SignalRBackplaneConnectionString, queueName))
.Options(o => o.EnableSynchronousRequestReply())
.Routing(r => r.TypeBased()
.MapSignalRCommands<ChatHub>(queueName))
.Subscriptions(s => s.StoreInSqlServer(SignalRBackplaneConnectionString, "Subscriptions", false)));
}
That worked! BTW, I think the readme for the SQL sample is incorrect. The best overload for 'Transport' does not have a parameter named 'isCentralized':
services.AddRebus(configure => configure
.Transport(x => x.UseSqlServer(SignalRBackplaneConnectionString, queueName, isCentralized: false))
I used the following and it appears to be working:
services.AddSignalR().AddRebusBackplane<Hub>();
var queueName = GenerateTransientQueueName("Rebus.SignalR");
services.AddRebus(configure => configure
.Transport(x => x.UseSqlServer(Configuration["SignalRBackplaneConnectionString"], queueName))
.Options(o => o.EnableSynchronousRequestReply())
.Routing(r => r.TypeBased()
.MapSignalRCommands<Hub>(queueName))
.Subscriptions(s => s.StoreInSqlServer(Configuration["SignalRBackplaneConnectionString"], "Subscriptions", false)));
Thanks.
Thank you for finding a bug with decentralized subscription storages :). And, please, don't forget to implement automatic deletion for the transient queue or you'll end up with a lot of unused tables in a database. I can try to add AutoDelete functionality to Rebus.SqlServer transport if @mookid8000 approves that change.
P.S. isCentralized is a parameter of StoreInSqlServer method. not of Transport.
Thanks.
I can try to add AutoDelete functionality to Rebus.SqlServer transport if @mookid8000 approves that change.
I've never considered something like that, but I guess it makes sense in this scenario.
It's important that the SignalR backplane remembers to unsubscribe before removing the queue, though. Also, I kinda fear that there will be a little bit of time where a publisher can publish an event and get the name of a non-existent queue, and then its publish operation will fail....... what are your thoughts on that?
@mookid8000
I think, that currently there's no obvious way to wait for the unsubcribe request completion in Rebus in case of a decentralized subscription storage. IBus.Unsubscribe returns immediately after sending an unsubscribe command. If there was an option to await for the processing result using something like Rebus.Async, it would be possible.
I have an additional problem with unsubscribe. Initially, I wanted to make a configuration of Rebus.Signalr as simple as possible - just one line of code for each hub after AddSignalR:
services.AddSignalR()
.AddRebusBackplane<ChatHub>();
My problem is - I can't subscribe to IBus lifetime events at this point, cause BusLifetimeEvents isn't accessible neither through the DI container nor through IBus interface. The only place, I could do it - is at Rebus configuration code, but that would require the developer to call some other extension method in addition to AddRebusBackplane
What do you think about exposing BusLifetimeEvents through IBus interface to make it possible to register a lifetime event subscriber not only at Rebus configuration point? Maybe, you could point me to some alternative solution that I don't see now.
It's important that the SignalR backplane remembers to unsubscribe before removing the queue, though. Also, I kinda fear that there will be a little bit of time where a publisher can publish an event and get the name of a non-existent queue, and then its publish operation will fail....... what are your thoughts on that?
Hi @mookid8000!
Did you have the chance to think about my proposition to add BusLifetimeEvents to IBus contract? The problem is described in the previous message in this thread.
Did you have the chance to think about my proposition to add BusLifetimeEvents to IBus contract?
I'm very reluctant to add anything to IBus
, because it's a big breaking change.
So I'm curious to see if we can solve your problem in another way – maybe I'm a little bit slow here, but why was it that you wanted access to BusLifetimeEvents
?
@mookid8000
I need to unsubscribe from SignalR events before stopping the bus. And I don't see any way how to do it at that call point:
services.AddSignalR()
.AddRebusBackplane<ChatHub>();
I don't have any access to anything besides IBus interface at this point. As I mentioned before - "The only place, I could do it - is at Rebus configuration code, but that would require the developer to call some other extension method in addition to AddRebusBackplane(); And the only reason for that complication is that Rebus doesn't register its interfaces in DI container, only IBus and IHandleMessages."
Did you have the chance to think about my proposition to add BusLifetimeEvents to IBus contract?
I'm very reluctant to add anything to
IBus
, because it's a big breaking change.So I'm curious to see if we can solve your problem in another way – maybe I'm a little bit slow here, but why was it that you wanted access to
BusLifetimeEvents
?
ah! I think you should be able to decorate BusLifetimeEvents
, possibly "injecting" the list of events subscribed to... something like
var relevantEvents = new[] {typeof(Whatever)};
Configure.With(Using(new BuiltinHandlerActivator()))
.(...)
.Options(o => o.Decorate<BusLifetimeEvents>(c =>
{
var busLifetimeEvents = c.Get<BusLifetimeEvents>();
busLifetimeEvents.BusDisposing += () =>
{
var bus = c.Get<IBus>().Advanced.SyncBus;
foreach (var evt in relevantEvents)
{
bus.Unsubscribe(evt);
}
};
return busLifetimeEvents;
}))
.Start();
@mookid8000
Yes, and that would require two points of Rebus.SignalR configuration:
services.AddSignalR()`
.AddRebusBackplane<ChatHub>();
Configure.With(....)
If only I could get BusLifetimeEvents from DI container (services), it'd possible to use only one point. That's what I wrote about.
P.S. MassTransit uses three configuration points for SignalR, but I hoped that we could do much better with Rebus.Signalr :) (https://masstransit-project.com/advanced/signalr/quickstart.html):
services.AddSignalR().AddMassTransitBackplane(); // This is the first important line
....
// Add this for each Hub you have
x.AddSignalRHubConsumers<ChatHub>();
....
// Register endpoint for each hub you have
cfg.AddSignalRHubEndpoints<ChatHub>(provider);
ah! I think you should be able to decorate
BusLifetimeEvents
, possibly "injecting" the list of events subscribed to... something likevar relevantEvents = new[] {typeof(Whatever)}; Configure.With(Using(new BuiltinHandlerActivator())) .(...) .Options(o => o.Decorate<BusLifetimeEvents>(c => { var busLifetimeEvents = c.Get<BusLifetimeEvents>(); busLifetimeEvents.BusDisposing += () => { var bus = c.Get<IBus>().Advanced.SyncBus; foreach (var evt in relevantEvents) { bus.Unsubscribe(evt); } }; return busLifetimeEvents; })) .Start();
ok, maybe it's because I don't understand the full Rebus + SignalR configuration..... could you maybe show me a snippet containing these parts:
IServiceCollection
IServiceProvider
?@mookid8000
Here's all the code that configures Rebus backplane. I subscribe to the events at the point of DI configuration, but I can't unsubscribe here, because I don't have any access to BusLifetimeEvents through IBus or through DI container:
public static ISignalRServerBuilder AddRebusBackplane<THub>(this ISignalRServerBuilder signalRServerBuilder)
where THub : Hub
{
signalRServerBuilder.Services.AddTransient<IHandleMessages<AddToGroup<THub>>, AddToGroupHandler<THub>>();
signalRServerBuilder.Services.AddTransient<IHandleMessages<RemoveFromGroup<THub>>, RemoveFromGroupHandler<THub>>();
signalRServerBuilder.Services.AddTransient<IHandleMessages<All<THub>>, AllHandler<THub>>();
signalRServerBuilder.Services.AddTransient<IHandleMessages<Connection<THub>>, ConnectionHandler<THub>>();
signalRServerBuilder.Services.AddTransient<IHandleMessages<Group<THub>>, GroupHandler<THub>>();
signalRServerBuilder.Services.AddTransient<IHandleMessages<User<THub>>, UserHandler<THub>>();
signalRServerBuilder.Services.AddSingleton<HubLifetimeManager<THub>, RebusHubLifetimeManager<THub>>(sp =>
{
var bus = sp.GetService<IBus>();
var hubProtocolResolver = sp.GetService<IHubProtocolResolver>();
var logger = sp.GetService<ILogger<RebusHubLifetimeManager<THub>>>();
bus.Subscribe<AddToGroup<THub>>();
bus.Subscribe<RemoveFromGroup<THub>>();
bus.Subscribe<All<THub>>();
bus.Subscribe<Connection<THub>>();
bus.Subscribe<Group<THub>>();
bus.Subscribe<User<THub>>();
var rebusHubLifetimeManager = new RebusHubLifetimeManager<THub>(bus, hubProtocolResolver, logger);
return rebusHubLifetimeManager;
});
return signalRServerBuilder;
}
For clarity here is complete Startup from the sample application:
public class Startup
{
private IConfiguration Configuration { get; }
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
private static string GenerateTransientQueueName(string inputQueueName)
{
return $"{inputQueueName}-{Environment.MachineName}-{Guid.NewGuid().ToString()}";
}
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR()
.AddRebusBackplane<ChatHub>();
var rabbitMqOptions = Configuration.GetSection(nameof(RabbitMqOptions)).Get<RabbitMqOptions>();
var rabbitMqConnectionString =
$"amqp://{rabbitMqOptions.User}:{rabbitMqOptions.Password}@{rabbitMqOptions.Host}:{rabbitMqOptions.Port.ToString()}";
services.AddRebus(configure => configure
.Transport(x =>
{
x.UseRabbitMq(rabbitMqConnectionString, GenerateTransientQueueName("Rebus.SignalR"))
.InputQueueOptions(o =>
{
o.SetAutoDelete(true);
o.SetDurable(false);
});
})
.Options(o => o.EnableSynchronousRequestReply())
.Routing(r => r.TypeBased()));
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.ApplicationServices.UseRebus();
app.UseStaticFiles();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapHub<ChatHub>("/chatHub");
});
}
}
ok, maybe it's because I don't understand the full Rebus + SignalR configuration..... could you maybe show me a snippet containing these parts:
- Doing stuff to
IServiceCollection
- Configuring Rebus
- Installing Rebus-based SignalR backplane
- Building
IServiceProvider
?
Would it be possible to simply make RebusHubLifetimeManager
disposable and then ubsubscribe there?
I already tried that and got an exception trying to call bus.Unsubscribe in Dispose. I think DI container (at least the internal one in asp.net core) doesn't guarantee the order of disposing objects. The bus seems to be disposed before RebusHubLifetimeManager.
I tried it with IDisposable and IAsyncDisposable with the same result, that's why I proposed that we need to get BusLifetimeEvents from DI container - either through explicit registration or through the property of IBus interface
Would it be possible to simply make
RebusHubLifetimeManager
disposable and then ubsubscribe there?
ok I get it now 🙄 as an experiment, I've released Rebus.ServiceProvider 5.0.5 where you can
var events = serviceProvider.GetRequiredService<BusLifetimeEvents>();
events.BusDisposing += () => // do stuff ;);
Let me know if that turns out to be useful 🙂
Thanks a lot! :). That's exactly what was needed to solve the problem. I tested it with the sample application and merged the change into Rebus.SignalR. Could you update Nuget to Rebus.SignalR 0.0.5 please?
P.S. And I suppose we should have the same registration of BusLifetimeEvents for all DI containers supported by Rebus? Like a common contract, so no application will be broken if they prefer to use Autofac + Rebus.Autofac instead of the internal ServiceProvider.
ok I get it now 🙄 as an experiment, I've released Rebus.ServiceProvider 5.0.5 where you can
var events = serviceProvider.GetRequiredService<BusLifetimeEvents>(); events.BusDisposing += () => // do stuff ;);
Let me know if that turns out to be useful 🙂
Cool! It's out as Rebus.SignalR 0.0.5 now 🙂
And I suppose we should have the same registration of BusLifetimeEvents for all DI containers supported by Rebus
Yes, some time later we probably should..... the great thing about adding more stuff to the container is that it can stay under the radar for a while and prove its usefulness, and then later a little further down the road it can be promoted to a thing.
Rebus.SignalR is still showing 0.0.4 on nuget.
weird... my script must not have worked 100%, only 95%.... I've pushed it now 🙂 thanks for telling me!
I'm using SQL Server as my transport and the database and tables all appear to be created and functional on the server. I've copied my code below.
A user joins a chat with a unique group name and should automatically receive a message from the Hub: "{displayName} has joined the chat.". I'm seeing logs that the
SendMessageToGroup
was published, but I'm not seeing it in my client. Is there a subscription piece I'm missing? I'm new to Rebus so sorry if this is basic.Startup.cs
Hub
Client
Client Log
Server Log