rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
62 stars 44 forks source link

Pipelining of requests forbidden for RabbitMQ when calling `IBus.Publish` many times #76

Closed mindisk closed 3 years ago

mindisk commented 3 years ago

Hi, I have encountered this Exception several times. It usually occurs when I attempt to call IBus.Publish many times from the same thread.

Exception while publishing event 
System.NotSupportedException: Pipelining of requests forbidden   at 
RabbitMQ.Client.Impl.RpcContinuationQueue.Enqueue(IRpcContinuation k)   at 
RabbitMQ.Client.Impl.ModelBase.Enqueue(IRpcContinuation k)   at 
RabbitMQ.Client.Impl.ModelBase.TransmitAndEnqueue(OutgoingCommand& cmd, IRpcContinuation k)   at 
RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)   at 
RabbitMQ.Client.Framing.Impl.Model._Private_ConfirmSelect(Boolean nowait)   at 
RabbitMQ.Client.Impl.ModelBase.ConfirmSelect()[nl]   at 
RabbitMQ.Client.Impl.AutorecoveringModel.ConfirmSelect()   at 
Rebus.RabbitMq.RabbitMqTransport.DoSend(ConcurrentQueue`1 outgoingMessages, IModel model, Boolean isExpress)   at 
Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(ITransactionContext context, ConcurrentQueue`1 outgoingMessages)]   at 
Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)   at Rebus.Transport.TransactionContext.Complete()   at 
Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)   at
Rebus.Bus.RebusBus.InnerPublish(String topic, Object eventMessage, IDictionary`2 optionalHeaders)

Have you ever seen this exception occur? Perhaps you know what is the cause? And how to resolve?

I have added a small delay between the publish calls and it seems to reduce the number of thrown exceptions.

mookid8000 commented 3 years ago

Can you show me the code that publishes many times?

mookid8000 commented 3 years ago

I @mindisk , did you figure out what the problem was?

mookid8000 commented 3 years ago

I suspect that this has got something to do with Tasks not being awaited or something like that.

For now, I'll close this issue – please don't hesitate to pick it up again, if you're still experiencing problems.

mindisk commented 3 years ago

Hi, Sorry for being absent.

I have read that this error Pipelining of requests forbidden comes from RabbitMQ client when multiple threads tries to access IModel object at the same time. So it is concurrency issue as it appears. (https://www.rabbitmq.com/dotnet-api-guide.html#concurrency-channel-sharing). I would assume Rebus does lock IModel object FYI, we have multiple parallel operations that can interact with the Rebus at the same time. Therefore, we expect that multiple threads will attempt to publish events at the same time.

Anyway, the issue persist in our system. However, it is not consistent. I used RebusTransactionScope when publishing events. The issue disappeared for time being. I thought it was the solution. However, after some time it reappeared. So I guess it was just a coincidence.

The publishing code snippet:

public async Task PublishMessageAsync(MessageBase message)
 {
        if (!IsConnected || _bus == null)
            return;

        using var transactionScope = new RebusTransactionScope();
        await _bus.Publish(message, _rebusHeaders);
        await transactionScope.CompleteAsync();
}

As a temporarily solution, I will try to lock the publishing code and hopefully it will fix the issue..

Do you think SyncBus is good approach to avoid explicit lock? Will it make sure that one thread will access IModel at a time?

Thank you for your time.

mookid8000 commented 3 years ago

Rebus' RabbitMQ transport enqueues outgoing messages in memory and does not use any IModel before the "transaction context" (which is what you have when you're in a Rebus handler, or when you're using RebusTransactionScope) is committed.

In your case, that happens when calling await transactionScope.CompleteAsync();

Some work has happened around these things though lately, so could you try and update to the latest prerelease and see how that works for you?

mindisk commented 3 years ago

It was also happening when using await _bus.Publish(message, _rebusHeaders); alone without transaction stuff. I have reduced the number of handlers (that process and publish events) and that seems to help to mitigate the problem. I can confidently say that this occurs when multiple tasks/threads attempts to publish a message at the same time.

I will try to use a pre-release and see if the issue persist.

Thank you for your time.

mindisk commented 2 years ago

Hello,

After many months, I reached the point where I could try the pre-release version 7.3.2-b01. The pipelining issue dissapeard which is great! I understand that it is a pre-release, but I have seen some unusually high memory usage in some services and tons of logs writing following lines to the logs:

2021-10-13 12:09:15.0513 DEBUG Thread id: 16 RabbitMqTransport.Receive 0 Waiting for queue read 
2021-10-13 12:09:15.3005 DEBUG Thread id: 36 RabbitMqTransport.Receive 0 Reading from queue was cancelled 
2021-10-13 12:09:15.3041 DEBUG Thread id: 96 RabbitMqTransport.Receive 0 Reading from queue was cancelled 
2021-10-13 12:09:15.5580 DEBUG Thread id: 15 RabbitMqTransport.Receive 0 Waiting for queue read 
2021-10-13 12:09:15.5580 DEBUG Thread id: 15 RabbitMqTransport.Receive 0 Waiting for queue read 
2021-10-13 12:09:15.8040 DEBUG Thread id: 68 RabbitMqTransport.Receive 0 Reading from queue was cancelled 

It could be that these things are related, in a way where NLog buffer is filled at higher rate than it is being flushed. It could also be completely unrelated.

I will see if I can find the root cause of this.

mindisk commented 2 years ago

Hi again,

It seems that the memory problem we face is not due to a pre-release version of the package. That is great!

Do you have an estimate when the pre-release version 7.3.2-b01 will be released?

mookid8000 commented 2 years ago

When enough have tried the beta and say it's good 🙂