rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

Consumer reads from bus while handling message #54

Closed annoia closed 5 years ago

annoia commented 5 years ago

We encountered a deadlock while using Rebus, the steps are as follows:

  1. Send request GetA.
  2. As reaction to GetAResponse, send another request: GetB. => responses on one consumer are now not being handled.
  3. Await multiple responses to request GetB, sent in 2 (Thousands of entities are returned, so they are split into several packages). => It appears that the consumer that is waiting for GetAResponse to be handled picks up a response to GetB. Now the Task we are awaiting (GetB) will never finish, because the consumer with a response to GetB is awaiting GetB before more responses from that consumer's particular queue can be read.

Might it be an idea to have consumers stop reading from the bus while a message from their queue is being handled?

mookid8000 commented 5 years ago

Can you tell me some more about what you're trying to achieve?

From what you're writing, I get that you're using RabbitMQ, but I can't quite understand what caused a deadlock? There's no way in Rebus to (at least intentionally 🙂 ) block any work from being done by waiting for a specific message...

annoia commented 5 years ago

The flow is something like:

void SomeMethod(){ GetCompanies(); } ... // This is called from Rebus private async Task OnCompanies(List companies){ IEnumerable peopleIds = companies.SelectMany(c => c.People); await GetPeople(peopleIds); }

Looking at RabbitMqTransport.cs I see:

public async Task Receive(ITransactionContext context, CancellationToken cancellationToken) { ... if (!_consumers.TryDequeue(out var consumer)) ... context.OnDisposed(() => _consumers.Enqueue(consumer)); ... }

Which gives me the impression that Rebus will not read from this consumer again, until the current task is finished? So if the response to the GetPeople request ends up in the consumer that has been removed from the _consumers queue, then it deadlocks.

mookid8000 commented 5 years ago

Are you using Rebus.Async?

annoia commented 5 years ago

No, we are creating the Task ourselves after sending the 2nd request.

mookid8000 commented 5 years ago

Ok... so when you say

(...) So if the response to the GetPeople request ends up in the consumer that has been removed from the _consumers queue, then it deadlocks

it implies to me that the current code is blocked, waiting for a reply to come in via RabbitMQ... how does it wait for the RabbitMQ message if it's not using Rebus.Async?

annoia commented 5 years ago

We create a Task, that we await. If we're fetching 5000 people, we get 10 responses with 500 people in each. When the last response has been received, we set the result, completing it.

Due to how things are coded we can choose to not await the GetPeople response with regard to Rebus, if we do this, all is fine. If we DO await it, we're missing one or several responses, depending on how many we receive (my guess is because the consumer currently waiting for the GetCompanies response has these messages, but is waiting for GetCompanies to be handled to completion). So if Rebus does NOT wait for the response to be completely handled, then everything works as expected (Although, then we miss some of the nicer features like not reading everything in RabbitMQ faster than we can handle it).

mookid8000 commented 5 years ago

Ah, then I understand your problem.

The reason Rebus "cycles" through its RabbitMQ subscribers, is that it will eventually result in having a pool of subscribers, where each message receive operation then gets a dedicated subscriber for the entire duration of the handling of that message.

It turned out that this design accelerated message processing greatly, and my belief is that it is sound. It just has this side-effect of introducing a potential deadlock, because (as you've discovered), the RabbitMQ subscribers will internally prefetch messages, so as you've correctly observed, doing a blocking wait for a message that happens to have been prefetched by the current subscriber, will result in effectively deadlocking the wait operation.

Generally, I advise people to stay away from using asynchronous messaging to implement synchronous request/response protocols – generally, it's much better to use a synchronous request/response protocol to do that 😉 (like HTTP). But I also understand that sometimes you can have good reasons to not follow general advice.

So, to solve your problem right now, you might be lucky that you can get away with simply disabling prefetch like this:

Configure.With(...)
    .Transport(t => t.UseRabbitMq(...).Prefetch(1))
    .(...)
    .Start();

For the future, I suggest you consider either

or redesign this part of the system to avoid request/reply alltogether.