rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 359 forks source link

Possibly something wrong with sending messages outside transaction context #1076

Closed AYss closed 1 year ago

AYss commented 1 year ago

Hi.

One of our services is crashing sometimes with following stack trace:

[Error] NACKing 94 because of exception Rebus.Exceptions.RebusApplicationException: Could not 'GetOrAdd' item with key 'outgoing-messages' as type System.Collections.Concurrent.ConcurrentQueue`1[Rebus.Transport.AbstractRebusTransport+OutgoingMessage]
 ---> System.InvalidOperationException: Cannot add OnCommitted action on a completed transaction context.
   at Rebus.Transport.TransactionContext.ThrowCompletedException(String actionName)
   at Rebus.Transport.TransactionContext.OnCommitted(Func`2 commitAction)
   at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass2_0.<Send>b__0()
   at Rebus.Transport.TransactionContextExtensions.<>c__DisplayClass2_0`1.<GetOrAdd>b__0(String id)
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Rebus.Transport.TransactionContextExtensions.GetOrAdd[TItem](ITransactionContext context, String key, Func`1 newItemFactory)
   --- End of inner exception stack trace ---
   at Rebus.Transport.TransactionContextExtensions.GetOrAdd[TItem](ITransactionContext context, String key, Func`1 newItemFactory)
   at Rebus.Transport.AbstractRebusTransport.Send(String destinationAddress, TransportMessage message, ITransactionContext context)
   at Rebus.Pipeline.Send.SendOutgoingMessageStep.<>c__DisplayClass4_0.<<Send>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Pipeline.Send.SendOutgoingMessageStep.Send(List`1 destinationAddressesList, TransportMessage transportMessage, ITransactionContext currentTransactionContext)
   at Rebus.Pipeline.Send.SendOutgoingMessageStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Diagnostics.Outgoing.OutgoingDiagnosticsStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Pipeline.Send.ValidateOutgoingMessageStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Pipeline.Send.SerializeOutgoingMessageStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Retry.Simple.VerifyCannotSendFailedMessageWrapperStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Pipeline.Send.AutoHeadersOutgoingStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Authorization.AuthorizationStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Pipeline.Send.FlowCorrelationIdStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Pipeline.Send.AssignDefaultHeadersStep.Process(OutgoingStepContext context, Func`1 next)
   at Rebus.Bus.RebusBus.SendUsingTransactionContext(IEnumerable`1 destinationAddresses, Message logicalMessage, ITransactionContext transactionContext)
   at Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)
   at Rebus.Bus.RebusBus.InnerPublish(String topic, Object eventMessage, IDictionary`2 optionalHeaders)
   at Rebus.EventPublisher.Publish[TEvent](TEvent event, IDictionary`2 optionalHeaders)
   at BrokerProxy.BrokerCommunication.BrokerListenerService.ProcessMessage(BasicDeliverEventArgs ea) in C:\BuildAgent\work\100d7f35bdc82dd3\src\BrokerProxy\BrokerCommunication\BrokerListenerService.cs:line 60
   at BrokerProxy.BrokerCommunication.BrokerRabbitMq.<>c__DisplayClass7_0.<<StartListener>b__0>d.MoveNext() in C:\BuildAgent\work\100d7f35bdc82dd3\src\BrokerProxy\BrokerCommunication\BrokerRabbitMq.cs:line 42

EventPublisher is just an envelope for IBus and Publish method (it just sets message id, so we can return it after publish, no magic there).

The BrokerProxy is just integration service with a background service called BrokerCommunication passing messages from a queue into our realm. When a message comes to an integration queue, it is mapped, and Bus.Publish is called. Nothing else. There is no transaction context since it's done in regular rabbit handler, so the transaction context is created by RebusBus.InnerSend method:

        else
        {
            using var context = new TransactionContextWithOwningBus(this);
            await SendUsingTransactionContext(destinationAddresses, logicalMessage, context);
            await context.Complete();
        }

Sometimes Publish crashes with the above exception. By sometimes I mean once a few days, while processing a few thousands messages per hour. It is quite deadly, because if it happens, the message is NACKed and immediately consumed again and it crashes again. In our test environment it achieves about 2k crashes per minute. But on client's test environment (much more cpus and ram) it managed to generate 1,7 million errors within 3 minutes.

It seems that sometimes the code adding message sending delegates (that are executed on context completion) executes after the context.Complete() call for no obvious reason. What else can I say? I've been trying to figure this out for a couple of weeks now and still no luck. I have no repro code. I know it happens much more often when executed on extremly fast environments. On our test server (just 8 cores) it happend twice.

Could you please help me somehow how to proceed with it?

mookid8000 commented 1 year ago

This is a pretty common error when a call to bus.Send or bus.Publish is not awaited, i.e. if it's called like this (never do this!):

// 👇 this one is missing an "await"
bus.Publish(someEvent); //< 💀

Could you maybe go through your SendUsingTransactionContext implementation and all transitive calls and verify that you await bus.Publish(yourStuff)?

If you have something (e.g. an injected dependency far down in the call hierarchy) that does not return Task/ValueTask and thus cannot await things, you can still use the bus, you just need to grab ISyncBus and use its methods instead, e.g. like

// 👍
bus.Advanced.SyncBus.Publish(yourEvent);
AYss commented 1 year ago

I've checked everything several times. I even skipped our EventPublisher and called ISyncBus.Publish. The thing still happens. I have only one suspect left and it is AsyncEventingBasicConsumer. It is responsible for the whole execution context. I have just changed that to regular EventingBasicConsumer and eliminated tasks completly from the job. I let you know in a few days if that solved the issue.

mookid8000 commented 1 year ago

Ok! Also be sure that other async things are awaited, e.g. if you rely on callbacks from other libraries or other stuff like that, if there's an event in there somewhere with an async void signature, or something similar, then it's bound to mess with Rebus' ambient transaction context.

By the way, just to be entirely clear about this: The error you are getting is one that you will get if something tries to enlist in Rebus' ambient transaction context AFTER it has been committed/aborted.

An easy way to reproduce the error message is like this:

using (var scope = new RebusTransactionScope())
{
    // complete the tx scope
    await scope.CompleteAsync();

    // the ambient tx context AmbientTransactionContext.Current is still available
    // here, so this 👇 will result in an error!
    await bus.SendLocal("HEJ");
}

which results in

Rebus.Exceptions.RebusApplicationException : Could not 'GetOrAdd' item with key 'outgoing-messages' as type System.Collections.Concurrent.ConcurrentQueue`1[Rebus.Transport.AbstractRebusTransport+OutgoingMessage]
  ----> System.InvalidOperationException : Cannot add OnCommitted action on a completed transaction context.
   at Rebus.Transport.TransactionContextExtensions.GetOrAdd[TItem](ITransactionContext context, String key, Func`1 newItemFactory)
   at Rebus.Transport.AbstractRebusTransport.Send(String destinationAddress, TransportMessage message, ITransactionContext context)
   at Rebus.Pipeline.Send.SendOutgoingMessageStep.<>c__DisplayClass4_0.<<Send>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Pipeline.Send.SendOutgoingMessageStep.Send(List`1 destinationAddressesList, TransportMessage transportMessage, ITransactionContext currentTransactionContext)
(...)

In case it turns out that this is NOT because of a non-awaited Task and a resulting race condition, but it's because of a bus operation that is wrongfully enlisted in the ambient transaction context, you can be sure that that will never happen by dismantling it temporarily with Rebus' RebusTransactionScopeSuppressor like this:

using (var scope = new RebusTransactionScope())
{
    // complete the tx scope
    await scope.CompleteAsync();

    // the ambient tx context AmbientTransactionContext.Current will be NULLed
    // temporarily here, so this 👇 will NEVER result in an error!
    using (new RebusTransactionScopeSuppressor())
    {
        await activator.Bus.SendLocal("HEJ");
    }
}
AYss commented 1 year ago

Hi again.

I am 100% sure we do not have any loose tasks flying around because of some async void methods. At least on our side. I changed a few things, changed approach, ran a lot of tests and here are the conclusions:

I have idea what is going on within rabbit libs, so I skip analyzing that and just go ahead with conclusion, that eventing consumers are not safe choice for the job, at least on high-end, clustered environments.

mookid8000 commented 1 year ago

Hi @AYss , I can imagine one other scenario that would result in what you're experiencing: If the RabbitMQ consumer thread is somehow spawned when there's an active Rebus ambient transaction context, e.g. something like this:

// start scope
using var scope = new RebusTransactionScope();

// start consumer
var consumer = new AsyncEventingBasicConsumer(model);
model.BasicConsume("some-queue", autoAck: false, consumer: consumer);

// finish scope
await scope.CompleteAsync();

Depending on how RabbitMQ's threading model is implemented, there's a chance this would calture the execution context (including Rebus' ambient transaction) and cause it to live forever on the RabbitMQ consumer thread.

Since the ambient transaction is completed, it will live in a state incapable of enlisting new bus operations, thus leading to the error you're seeing.

Could it be something like this?

In any case, you should probably solve the problem by being explicit about which ambient Rebus transaction you'd like your bus operations to go in. And you can do this by avoiding the scope alltogether:

using (new RebusTransactionScopeSuppressor())
{
   // .. do bus stuff without ambient transaction in here
}

or you can create a new scope, which will give you batching and other nifties:

using (var scope = new RebusTransactionScope())
{
   // .. do bus stuff in your own ambient transaction in here

   // remember to complete it
   await scope.CompleteAsync();
}
mookid8000 commented 1 year ago

Hi @AYss , if I understood you correctly, you solved your problem by using RebusTransactionScopeSuppressor, right?

I assume this is a weird corner case of some kind, since I haven't had any reports from anyone else resembling it. Therefore, I am closing the issue.

Please let me know if there's something still troubling you with this. 🙂

AYss commented 1 year ago

Hi @mookid8000 . Yep, I used RebusTransactionScopeSuppressor as a workaround. Target solution will be to get rid of eventing consumer and use polling, or simply use rebus to read the integration queue with custom deserializer, that will provide all necessary headers. Everything is OK for now. I am going to get back to this to understand why eventing consumers cause such problems. Since we are currently in the middle of going live with the system, not sure when that might be... I'll come back with answers 😉