rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 353 forks source link

How can I better implement the RegisterSubscriber task in Rebus transport without deadlocking? #1135

Closed glazkovalex closed 4 months ago

glazkovalex commented 5 months ago

Hello, @mookid8000! In the Rebus.Kafka transport, when implementing the RegisterSubscriber method of the ISubscriptionStorage interface, I create a Task and return it to the result of the method. And when a response comes from Apache Kafka and I receive an internal event from the Confluent.Kafka library, then I complete this Task. Everything seems to be simple, but if, when calling the bus.Subscribe() method, I set "await" or try to subscribe to my events in the"OnCreated" Rebus in any way, I will get a dead lock. I manage to subscribe exclusively only outside of "OnCreated" and only in a synchronous way: bus.Subscribe().Wait();

I suspect that Rebus somehow tightly controls the flows, as a result of which the event in my transport cannot be processed and my task cannot be completed during the asynchronous execution of the task with which I process the internal event.

I tried many different options, but I couldn't "crack" the flow control implemented in Rebus. Do you know why my task in RegisterSubscriber is deadlocked and how can I better handle the callback of the transport library event inside the Rebus transport implementation so as not to get a deadlockout?

mookid8000 commented 5 months ago

Without having investigated further, this sounds more like a deadlock that can arise from unintentionally "capturing" the Kafka worker thread. Usually, when working with TaskCompletionSource, you need to dispatch your interactions with it on the thread pool, e.g. by going

Task.Run(() => taskCompletionSource.SetResult(...));

so the calling thread will not be the one actually setting the result.

Don't know if you alread did that? If not, maybe you could try it?

glazkovalex commented 5 months ago

Thank you very much, @mookid8000! Executing TaskCompletionSource.SetResult(...) in a separate thread it helped! Now the next asynchronous subscription is not blocked:

IBus bus = host.Services.GetRequiredService<IBus>();
await bus.Subscribe<MyEvent>();

😀

However, the second half of the problem remained. Any attempt to subscribe to events in the "OnCreated" bus event still causes a dead lock. Blocking occurs with both asynchronous and asynchronous subscriptions. onCreated: async bus => await bus.Subscribe<MyEvent>() - deadlock. onCreated: bus => bus.Subscribe<MyEvent>() - deadlock.

onCreated: (bus) =>
{
    bus.Subscribe<MyEvent>().Wait();
    return Task.CompletedTask; 
}

And so, too, a deadlock. Apparently this is because the "OnCreated" event is triggered "once the bus is operational, but before it has been started (i.e. begun receiving messages)". Whereas Kafka ends my first subscription only after the start of receiving messages. I'll think about how to beat it...

If you have any ideas about what how it can be surpassed, please write.

glazkovalex commented 4 months ago

The second part of the issue about "OnCreated" seems to relate more to the issue #1134 . And this issue can be closed.