rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
62 stars 44 forks source link

Setting Up RabbitMQ as Timeout Manager #118

Closed SilentBlueD666 closed 3 months ago

SilentBlueD666 commented 4 months ago

Hi Me again 😅

So I don't know if I am being dumb and just not getting it, so let me explain what I done and wonder if you can point me to where I am going wrong 😁 Note/context: I normally use SQL Timeout manager and been great for years but want to try and use RabbitMQ as the other day our SQL cluster was down, I use following pattern from Automatic retries and error handling so as you properly guested the messages didn't get deferred and ended up in the error queue. My idea is that if rabbit is down won't get the messages anyway so won't get the problem (outbox pattern used for sending).

I setup RabbitMQ 3.13.0 (in docker). Downloaded and installed RabbitMQ Delayed Message 3.12.0 Plugin

Setup Rebus with following:


var rebusConfiguration = configuration.GetRebusConfiguration();
services
    .AddRebus(
        startAutomatically: false,
        key: RebusLifetimeManager.BusKey,
        configure: config =>
            config
                .UseConfiguration(rebusConfiguration)
                .Options(options =>
                {
                    if (hostingEnvironment.IsDevelopment())
                        options.LogPipeline();

                    options.EnableDiagnosticSources();
                    options.RetryStrategy(secondLevelRetriesEnabled: true);
                })
                .Transport(transport
                    => transport
                        .UseRabbitMq(rebusConfiguration.Server, rebusConfiguration.Queue)
                        .ClientConnectionName(EnvironmentNetCore.ApplicationName))
                .Routing(r => r.UseRoutes(rebusConfiguration))
                .Timeouts(timeout => timeout.UseDelayedMessageExchange("RebusDelayed"))
                .Serialization(s => s.UseNewtonsoftJson(JsonInteroperabilityMode.PureJson))
                .Logging(l => l.Serilog(Log.Logger)),
        onCreated: bus =>
        {
            bus.SubscribeToEvents(rebusConfiguration);
            return Task.CompletedTask;
        })
    .AutoRegisterHandlersFromAssembly(searchAssembly);

Start application and the following exchange is created in RabbitMQ: image

First thing I have noticed is the exchange was created with @ prefixed to the name configured... not sure if this is meant to happen?

The test code:

public sealed class StockReservationEventHander(
    IIdempotentRequestManager idempotentRequestManager,
    IBus bus,
    ILogger logger) :
    IHandleMessages<InventoryReserved>,
    IHandleMessages<IFailed<InventoryReserved>>
{
    private readonly IIdempotentRequestManager _idempotentRequestManager = idempotentRequestManager;
    private readonly IBus _bus = bus;
    private readonly ILogger _logger = logger.ForContext<StockReservationEventHander>();

    public async Task Handle(InventoryReserved message)
    {
        var logger = _logger.WithExecutionContext($"{nameof(Handle)}_{nameof(InventoryReserved)}");

        var outcome = await _idempotentRequestManager.TrackMessage();
        await outcome
            .Match(
                Success: async () =>
                {
                    var reservationId = ReservationId.Parse(message.ReservationId);

                    try
                    {
                        await Task.Delay(1000); // Testing delay...
                        throw new NotImplementedException(); // Testing Error handling...

                        // Other busness logic goes here.
                    }
                    catch
                    {
                        await _idempotentRequestManager.MessageFailed();
                        throw;
                    }

                    logger
                        .Information(
                            "Message {MessageType} with id {MessageId} is being processed.",
                            typeof(InventoryReserved).FullName,
                            MessageContext.Current.GetIdempotentKey());
                },
                Failure: _
                    => logger
                        .Warning(
                            "Message {MessageType} with id '{MessageId}' has already been processed.",
                            typeof(InventoryReserved).FullName,
                            MessageContext.Current.GetIdempotentKey()));
    }

    public async Task Handle(IFailed<InventoryReserved> message)
    {
        var deferCount = message.GetDeferCount();
        var logger = _logger.WithExecutionContext($"{nameof(Handle)}_{nameof(IFailed<InventoryReserved>)}");

        if (deferCount % 5 == 0)
            logger
                .ForContext("ErrorMessage", message.ErrorDescription)
                .Error("Message {MessageType} with id '{MessageId}' has failed {DeferCount} times.", typeof(InventoryReserved).FullName, MessageContext.Current.GetIdempotentKey(), deferCount + 1);
        else
            logger
                .ForContext("ErrorMessage", message.ErrorDescription)
                .Warning("Message {MessageType} with id '{MessageId}' has failed {DeferCount} times.", typeof(InventoryReserved).FullName, MessageContext.Current.GetIdempotentKey(), deferCount + 1);

        await _bus.Advanced.TransportMessage.Defer(TimeSpan.FromMinutes(5));
    }
}

The point of this code is to keep looping the message forever until the problem is fixed (for reasons 😅).

However I get the following error when await _bus.Advanced.TransportMessage.Defer(TimeSpan.FromMinutes(5)); line runs:

[24 Feb 2024 13:27:45 ERR] An error occurred when attempting to complete the transaction context
Rebus.Exceptions.RebusApplicationException: Exchange 'RebusDelayed' does not exist
 ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no exchange 'RebusDelayed' in vhost 'coda'', classId=40, methodId=10
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, Boolean internal, Boolean nowait, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclarePassive(String exchange)
   at RabbitMQ.Client.Impl.AutorecoveringModel.ExchangeDeclarePassive(String exchange)
   at Rebus.RabbitMq.RabbitMqTransport.CheckQueueExistence(FullyQualifiedRoutingKey routingKey, IModel model)
   --- End of inner exception stack trace ---
   at Rebus.RabbitMq.RabbitMqTransport.CheckQueueExistence(FullyQualifiedRoutingKey routingKey, IModel model)
   at Rebus.RabbitMq.RabbitMqTransport.<>c__DisplayClass61_0.<EnsureQueueExists>b__0(FullyQualifiedRoutingKey _)
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Rebus.RabbitMq.RabbitMqTransport.EnsureQueueExists(FullyQualifiedRoutingKey routingKey, IModel model)
   at Rebus.RabbitMq.RabbitMqTransport.DoSend(IEnumerable`1 outgoingMessages, IModel model, Boolean isExpress)
   at Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(IEnumerable`1 outgoingMessages, ITransactionContext context)
   at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass3_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ProcessMessage(TransactionContext context, TransportMessage transportMessage)

So I think to my self that maybe the auto setup exchange is wrong, so I tried to delete the auto created exchange, and tell Rebus not to create and manually setup without the prefix: image

Now I get the following error message:

[24 Feb 2024 14:20:44 WRN] Closed channel detected - consumer will be disposed
[24 Feb 2024 14:20:44 ERR] An error occurred when attempting to complete the transaction context
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=541, text='INTERNAL_ERROR', classId=0, methodId=0
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirms(TimeSpan timeout, Boolean& timedOut)
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.WaitForConfirmsOrDie()
   at RabbitMQ.Client.Impl.AutorecoveringModel.WaitForConfirmsOrDie()
   at Rebus.RabbitMq.RabbitMqTransport.DoSend(IEnumerable`1 outgoingMessages, IModel model, Boolean isExpress)
   at Rebus.RabbitMq.RabbitMqTransport.SendOutgoingMessages(IEnumerable`1 outgoingMessages, ITransactionContext context)
   at Rebus.Transport.AbstractRebusTransport.<>c__DisplayClass3_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ProcessMessage(TransactionContext context, TransportMessage transportMessage)

In the docker terminal I also notice the following error: 2024-02-24 14:20:50 2024-02-24 14:20:50.356496+00:00 [error] <0.18899.0> ** Generic server <0.18899.0> terminating

And the exchange now has a binding: image

Not sure where to go from here 😅

LukeForder commented 3 months ago

I've experienced the first issue, when specifying the exchange name in UseDelayedMessageExchange an @ is appended to the declared exchange name (i.e RebusDelay is declared as @RebusDelay).

When attempted to defer a message, the RabbitMQ node errors stating the exchange RebusDelay does not exist.

Creating a (deferred) exchange RebusDelay manually solves the issue.

I've had a look at the code in the RabbitMqDelayedMessageExchangeExtensions class.

When setting the ExternalTimeoutManagerAddressOrNull @ is appended to the exchange name (resulting in the exchange being declared with the @ prefix)

Within the Send method, when deferring the message ( here)

return _transport.Send($"{recipient}{_exchangeName}", message, context);

For an configured exchange RebusDelay (now @RebusDelay) and queue queue, the resultant routing key would be queue@RebusDelay, which when tokenised by the RabbitMQ transport results in original exchange name (minus the @ prefix), hence the error.

With a few minor changes the tests pass, if I've understood it correctly I could create a PR with the changes?

mookid8000 commented 3 months ago

Good catch! Thanks to your thorough description it was easy for me to fix the problem.

It's out in Rebus.RabbitMq 9.2.1, which is on NuGet.org now 🙂

SilentBlueD666 commented 3 months ago

Brill my second problem did turn out to be the plug-in config error for Rabbit. I'll give the new package a go.

Thanks.