rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
63 stars 44 forks source link

[question] publishing new messages during a message handling #51

Closed yan-oreshchenkov closed 5 years ago

yan-oreshchenkov commented 5 years ago

I have a message handler that actually produce ~1000 new messages (call await bus.Publish() 1k times). I expected that the new messages come into exchange once I publish them. But it's not true. I noticed, that the 1k messages start to come into exchange only after the handler is completed.

I suspect it's expected behavior and related with MessageContext.TransactionContext, but not sure. The business task is sending notifications. So even if handler fails someway in the middle of its job, it's ok. We don't need to have transaction handling in this case.

Wouldn't you be so kind to clarify why it works as it works and advice how to allow the new messages become available in the queue one by one, while handler is working. The only thing that comes into my mind now is to call something like Task.Run(async ()=>await bus.Publish()); but I'm not sure if this is right way.

Thank you!

mookid8000 commented 5 years ago

Check out this answer on StackOverflow – it explains everything 😄

To sum it up: Yes, Rebus will delay the actual sending of all outgoing messages sent from within a message handler. This is intentional, because this is what generally results in the fewest surprises.

You are absolutely right that it's related to the current transaction context, and you can actually just remove it, if you want to "escape" it – just be sure to restore it again using a proper try/finally construct:

var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.Current = null;
try
{
    // this will happen immediately
    await bus.Publish(whee);
}
finally
{
    AmbientTransactionContext.Current = transactionContext;
}

And please, for the sake of readability, wrap it in something that implements IDisposable, so your code can look neat like this:

using(new RebusTransactionContextDismantler())
{
    // this will happen immediately :)
    await bus.Publish(whee);
}