Closed jcono closed 3 years ago
I've made some modifications that I think might be less of a change to the original code.
I've removed the option which I agree wasn't great. The only real difference now from the original code is that the call to SendMessageToSubscribers
will wait for all the subscriber handlers to finish and will throw an exception if one or more of them don't succeed.
So this handles the problem that one "bad" subscriber can stop message delivery to all the others while also running them in parallel.
The only thing I'm a bit unsure about is whether it's a problem to throw an exception now that was being swallowed before?
I think this is better now. Can you clean up the whitespace and other minor changes so we can keep this PR small and focused.
I think this is better now. Can you clean up the whitespace and other minor changes so we can keep this PR small and focused.
Sure thing. Changes are as small as I could make them now I think.
I have to apologise, but a bit further testing showed that, for reasons I haven't fully got to the bottom of yet, the way the task was created that was being waited on when sending messages wasn't propagating any exception correctly. Something about the way the Task
was being handled.
I've refactored that part of the code and it's now working as I expected.
...
It turns out that Task.Factory.StartNew
has a different signature than Task.Run
which resulted in the async Action
being passed in not being awaited in the returned task. By switching to Task.Run
which has an overload that accepts a Func<Task?>
the behaviour is as expected.
Anyway, the code is working as I expected now.
This is looking good, but I'm concerned about using Task.WaitAll and it not being async. Seems like that is going to be a sync over async call which is dangerous. Can't we just do await Task.WhenAll
? Also, I'm a little concerned about doing a Task.Run inside a loop. If there are a lot of subscribers it could cause an issue with thread starvation. That is what we were doing before so that is an existing issue. Could we do a Parallel.ForEach?
Also, I'd like for us to add a test that shows that multiple good subscribers and one bad one will work as expected and call all of the subscribers even though the one is going to blow up. Maybe register 2 good ones and then 1 bad one and then another 2 good ones and then verify that all of the good ones are called and that the error message (aggregateexception) is bubbled up like you are wanting it to be.
concerned about using Task.WaitAll and it not being async. Seems like that is going to be a sync over async call which is dangerous. Can't we just do
await Task.WhenAll
?
I'd tend to agree, although that will change the entire SendMessageToSubscribers
method to an async method which is a bigger change (one I had originally). I'm happy to do that but it will require changing a few more files (and I believe is a possibly breaking change for any libraries as it's a protected
method)
Also, I'd like for us to add a test that shows that multiple good subscribers and one bad one will work as expected
I've added to the already existing test CanTolerateSubscriberFailureAsync()
in the test harness which proves the subscribers still receive the message.
Confirming the exception contains the appropriate message is a little harder to put into the base functionality as it's really an implementation thing I think. For example, to pass that test I had to change the InMemoryMessageBus
to swallow the exception in its publish implementation for it to work (and keep it simple).
Thoughts?
concerned about using Task.WaitAll and it not being async. Seems like that is going to be a sync over async call which is dangerous. Can't we just do
await Task.WhenAll
?I'd tend to agree, although that will change the entire
SendMessageToSubscribers
method to an async method which is a bigger change (one I had originally). I'm happy to do that but it will require changing a few more files (and I believe is a possibly breaking change for any libraries as it's aprotected
method)
Yeah, I think we need to make that change because doing sync over async could potentially result in deadlocks.
Also, I'd like for us to add a test that shows that multiple good subscribers and one bad one will work as expected
I've added to the already existing test
CanTolerateSubscriberFailureAsync()
in the test harness which proves the subscribers still receive the message.Confirming the exception contains the appropriate message is a little harder to put into the base functionality as it's really an implementation thing I think. For example, to pass that test I had to change the
InMemoryMessageBus
to swallow the exception in its publish implementation for it to work (and keep it simple).Thoughts?
Ahh, I see it now. Yes, that is good. Thanks!
Ok all done I think. The build failed but I'm not sure it's anything to do with the code I've changed so hopefully it'll be ok this time.
Sorry, I'm out on vacation. I think the changes are good, but we just need to verify that we didn't break anything. I'll be out for another couple weeks. Maybe @niemyjski can take a look and see what he thinks.
When using the AzureServiceBus the best way to guarantee the delivery of messages is to use
ReceiveMode.PeekLock
on the subscription client. For details see FoundatioFx/Foundatio.AzureServiceBus#38.In order for this to work correctly the handler of a message needs to be able to throw an exception up to the subscription client. These changes allow that to happen by optionally running the handlers sequentially rather than in parallel in a task as they are by default so that if a handler throws an exception it can be handled by the underlying client.