Open utterances-bot opened 3 years ago
Hi Sean, This is a well organized and informative article, I really got a lot out of it !
For those of us not deploying on Windows OS (without Topper) , can you explain how one might run the OnBoardingProcessor in a Generic Host (ie. Microsoft.Extensions.Hosting.IHostBuilder
) and implement the Backend as a BackgroundService
.
Hi @danielh1
Thanks for your kind feedback, that's much appreciated and I'm very glad the article was useful for you.
For your question, I have been wondering about something similar recently too. I have not used BackgroundService
yet, but I was thinking about seeing if I could get Rebus working in one. Let me try that and I may post another article about that. I was thinking of having Rebus in a BackgroundService
inside a Linux container. Is that the sort of thing you are thinking about?
Hi Sean,
I was indeed thinking about running Rebus in a BackgroundService
, perhaps even as a Daemon.
Someone posted a similar question on reddit ( Rebus forever listening Console App ), but I have yet to find an implementation for Rebus.
Hi @danielh1 - I've added a new branch that has Rebus running in an IHostedService
which I've based on the approach here.
It seems quite simple to achieve. But, be aware that I am understanding the IHostedService
as a singleton and so I've injected the configuration into my service and then set up the Rebus IBus
when the service is started. My understanding there may not be correct, so be aware of that!
From that point, it's pretty simple and the code is just lifted from the WebJob example in the blog post. I've changed the transport to use the file system, so you should be able to run it easily.
:rocket:
public class RebusHostedService : IHostedService
{
private readonly IConfigurationRoot _configuration;
private IBus _bus;
private ServiceProvider _provider;
public RebusHostedService(IConfigurationRoot configuration)
{
_configuration = configuration;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var services = new ServiceCollection();
services.AddRebusAsSendAndReceive(_configuration);
_provider = services.BuildServiceProvider();
_provider.UseRebus(x => _bus = x);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _provider.DisposeAsync(); // This will dispose of the bus.
}
}
I'm wondering if @mookid8000 has any input to share on this scenario?
@seankearon I think it looks great 🙂 I think I would have preferred
_provider.UseRebus();
_bus = provider.GetRequiredService<IBus>();
instead of
_provider.UseRebus(x => _bus = x);
because then any subsequent use of _bus
within StartAsync
would not result in little warning squigglies, because R# does not know that the callback given to UseRebus
will be called immediately.... but that's a minor thing.
Great, thanks @mookid8000, much appreciated! :) (And, yes, I agree - the syntax you suggest is also clearer to read!)
Hi Sean,
The IHostedService
feature you added worked like a charm :star: . Thanks for that.
Despite my attempts to use the Filesystem Transport on Linux, I had to switch to an implementation using SqlServer Transport as the messages in the ../MainQueue
directory were not being consumed.
see: using-sql-server-transport
I suppose it could be due to file permissions on Linux.
I also had a thought about :
There are two versions of this post - one using C# and one using F#. Just for fun, why not try running the API from the C# project and the backend from the F# project? As long as you're using the same transport (at this stage that's file-system in c:\rebus-advent) then everything will work just fine. How lovely is that? :)
How would I go about sending the result of the OnboardingSaga
back to the controller ?
Perhaps changing await _bus.Send(new OnboardNewCustomer { Name = name, Email = email });
to
await _bus.SendRequest<SalesCallScheduled>(new OnboardNewCustomer { Name = name, Email = email });
But I'm not sure where to go from there as far as sending back the possible responses like:
to the controller.
Hi Dan
I'm glad to hear that you now Rebus working in an IHostedService
on Linux! I'm not sure why the file system transport isn't working either. As you say, possibly permissions. Remember that you can also use in-memory transport if you're testing or developing. I like to use the file system as it's easy to see what's in the messages.
For the second question, you probably don't want to send a result back to your API controller from the saga. Remembering that the saga may run for a long time and possibly even fail and need user intervention! If the API were waiting then it would significantly reduce the throughput of your API.
In my example, the API performs a "fire and forget" call to send the OnboardNewCustomer
command. As long as the controller completes without an exception, then the message will have been sent to the transport. Hence, it will be processed eventually even if the backend isn't running at the time! This is a wonderful thing and brings lots of reliability to your system.
To be honest, I am not sure I know about the _bus.SendRequest<...
feature in Rebus. Is that call from Rebus.Async? Generally, if you are wanting the activity to happen in the scope of the controller, then I'd implement it in the API and not hand off to the bus. If you're handing something off to Rebus, then you know that it's going to done and you can just get on with something else (i.e. handling more API requests in the case of our API). If you need to get a response and you are part of the bus, then you are probably a saga.
Does that make sense, or have I missed something?
Hi Sean,
Thanks for your quick response!
I certainly agree that having the Controller wait for the outcome of a "long-running" operation (Saga) would likely cause the request to timeout and so the synchronous-like aspect of _bus.SendRequest<...
is probably not the correct use case for http requests.
I suppose I should have asked my question a little differently given that the CustomerController(IBus bus)
would be initiated by a client from a WebApplication which would eventually respond with :
Considering the example of this usecase, the response at the completion of the saga can appear as some kind of "Alert" on the client.
I have taken a look at Rebus.SignalR
package since SignalR is the ideal backplane for web clients, but I haven't been able to understand where to make the integration points.
"MainQueue"
) or would I need to add a new queue name?public async Task<IActionResult> NewCustomer(...
endpoint and pass the client connectionID
as a parameter to the endpoint ?Perhaps I'm over engineering the solution and you know of a simpler/better method to respond to the webclient?
Ah, I see what you're getting at now! :) Sorry, that was my bad!!
It's not a scenario that I've actually built myself. In fact, I've not yet used SignalR, but I'm looking forward to having the opportunity as it looks super cool. That said, I think you can just call to your SignalR hub from inside the Rebus saga when you need to. The docs here. I could well be wrong though!
After having done a little more reading I realized that it may not be such a good idea to directly call the hub from from the saga (or anywhere from withing the OnboardingWorkerService
for that matter) since this would in a way be coupling the Saga to a particular client implementation (EntryPointAPI
).
This brings me back to my initial question regarding :
How would I go about sending the result of the OnboardingSaga back to the controller ?
In the EntryPointAPI
I created an AccountStatusHub
with a method which sends the end result of the saga to the client browser via SignalR:
The Hub has a SendMessage
method :
public async Task SendMessage(string message)
{
await Clients.User(userConnectionId).SendAsync("account_registration_result", message);
}
What I had tried to do unsuccessfully was to add a NotifyCustomerHandler
to the EntryPointAPI
and pass the message to the SendMessage
hub method.
Can you describe how to send a message from a Handler in OnboardingWorkerService
to the NotifyCustomerHandler
(or any handler for that matter) in the EntryPointAPI
?
Once again I'm very grateful for your help and advice :smiley:
Okay, so I'm assuming that in your scenario that the web project has some UI that you want to update as or after the saga progresses. It is possible to run a two-way Rebus IBus
in your website, but that has never felt right for me. The website should be as stateless as possible. So, I think that using SignalR to communicate the updates back to your website seems to be the natural approach.
So, if I needed to push updates back to the webpage then the way I would approach this problem would be something like this:
The correlation ID is just something to uniquely identify the original request. Use a GUID if there is nothing "natural" like an order ID at the point of sending.
By publishing status updates, the saga stays self-contained and free from dependencies. I'd use IBus.Publish
for the status updates as the saga is just notifying the world about events as they happen.
Having a separate handler that subscribes to the events allows you to centralise the external dependencies, SignalR in your case, into one area. You can choose to host that handler in a separate project and App Service or daemon as best suits your desire and scaling needs - that is, if it's going to process squillions of messages you may want to host it in own so that you can scale it appropriate to your needs.
You could also build it with the webpage polling and pulling the information back to the page. That would work well for the order page for my personal product (😢), but might not scale if you're selling warm cakes on a cold day! 😸 In that scenario, the handler that subscribes to the updates from the saga might update a database or a cache that the webpage calls to. If I was building this today, I'd probably take the polling approach first as I'm keen to find a reason to use HTMX!
The lovely thing about using a message-based architecture is that, if you build it with the push scenario and you need to change to the pull for some reason, then you can do that without changing a single line of code in the saga. Always a good thing!
Greetings Sean,
Thanks again for your advice an detailed answer !
WRT :
You can choose to host that handler in a separate project and App Service or daemon as best suits your desire and scaling needs -
In the EntryPointAPI project, I have attempted to add a NotifyCustomerHandler
for the NotifyServiceDesk
message, however the OnboardNewCustomer
message is not reaching the handler in the OnboardingSaga
.
I have tried all sorts of permutations of
.Map<NotifyServiceDesk>("Messages")
.Subscriptions(s => s.StoreInSqlServer(BackplaneConnectionString, "Subscriptions", isCentralized: true))
app.ApplicationServices.UseRebus(async bus =>
{
await bus.Subscribe<NotifyServiceDesk>();
});
which either result in different exceptions or messages do not get processed. :weary:
Could you kindly take a look at my commit (https://github.com/danielh1/rebus-onboardingcs/commit/1716013b2dec134bb0a0d5a73e76b4527af3f86c) and tell me what needs to be fixed for the NotifyCustomerHandler
to successfully process the NotifyServiceDesk
message ?
Hi @danielh1 - I that the IBus
instance in the controller will be transient and live only for the life of the controller. If you want Rebus to receive messages then you need to keep a bus running somewhere in your app. The line below is where that's done in the hosted service:
_provider.UseRebus(x => _bus = x);
Let me know if that makes sense.
Hi Sean,
I must be still missing something ( not only in the code, but may be in my comprehension :pensive: ).
Since the EntryPointAPI
is already a WebHost Application (Hosted service) I added the following :
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
//snip
app.ApplicationServices.UseRebus(async c =>
{
await c.Subscribe<NotifyServiceDesk>();
}
);
app.Run(async (context) =>
{
var bus = app.ApplicationServices.GetRequiredService<IBus>();
});
}
The following exception is thrown (from the EntryPointAPI)
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (Thread #29): Sending OnboardNewCustomer { Name = Dan, Email = inbox@mail.com.com } -> "Messages"
[WRN] Rebus.Retry.ErrorTracking.InMemErrorTracker (Thread #46): Unhandled exception 1 (FINAL) while handling message with ID "d18d3694-4b68-43d5-854e-9c01669b65ca"
Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID d18d3694-4b68-43d5-854e-9c01669b65ca and type OnboardingMessages.OnboardNewCustomer, OnboardingMessages could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
Hi Daniel
For the error in your last comment, it looks like you're getting the error when sending the OnboardNewCustomer
, right? If so, I'd check to see whether you have routed the message correctly.
That's different from the problem you had before that, right?
just want to thank you for an awesome and very detailed article that is very well organized!
@vpetkovic - thank you, that's very kind and I'm really glad that you like the article! :)
@seankearon - Hey, I wanted to thank you for this amazing article on the matter. It is exactly the scenario I was looking for. This article is by far one of the tidiest, detailed and right on point articles that I've read on this matter or just in general. Thanks a lot, keep the good work up and have a nice day ;)
@Amirarsalan99m - thank you very much for your kind words. They are much appreciated!
Long-running business processes in C# with Rebus on Azure
How to model and build long-running business processes using C# and Rebus and host them on Azure App Service.
https://seankearon.me/posts/2020/12/rebus-sagas-csharp/