rebus-org / Rebus.RabbitMq

:bus: RabbitMQ transport for Rebus
https://mookid.dk/category/rebus
Other
62 stars 44 forks source link

Poison handling with RabbitMQ Qourum queues #114

Open zlepper opened 6 months ago

zlepper commented 6 months ago

Greetings :D

I created this a long time ago for the main Rebus repo: https://github.com/rebus-org/Rebus/issues/1044 where we agreed that there is no way Rebus itself can solve this.

However now that Qourum queues are well supported in RabbitMQ there is actually something we can do. From the docs here: https://www.rabbitmq.com/quorum-queues.html#poison-message-handling it is possible to check the x-delivery-count header. I have tacked together some code to do it, however i'm not sure if it's something that should be integrated in Rebus.RabbitMq directly:

await using var services = new ServiceCollection()
    .AddRebus(r =>
    {
        return r.Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost:5672", "deliver-test").InputQueueOptions(
            o => { o.AddArgument("x-queue-type", "quorum"); })).Options(o =>
        {
            o.Decorate<IPipeline>(c =>
            {
                var pipeline = c.Get<IPipeline>();

                var injector = new PipelineStepInjector(pipeline);

                injector.OnReceive(new TooManyDeliveryAttemptsRejector(), PipelineRelativePosition.After,
                    typeof(DefaultRetryStep));

                return injector;
            });

            o.Decorate<ITransport>(c =>
            {
                var transport = c.Get<ITransport>();

                return new ClearDeliveryHeaderTransport(transport);
            });

            o.LogPipeline();
        });
    })
    .AddRebusHandler<MyHandler>()
    .BuildServiceProvider(true);

foreach (var hostedService in services.GetServices<IHostedService>())
{
    await hostedService.StartAsync(default);
}

var bus = services.GetRequiredService<IBus>();

// await bus.SendLocal("a message");
//
await Task.Delay(100000000);

class MyHandler : IHandleMessages<string>
{
    public Task Handle(string message)
    {
        Span<byte> lotsOfData = stackalloc byte[100_000_000];
        Console.WriteLine($"Received message: {message}, {lotsOfData.Length}");
        return Task.CompletedTask;
    }
}

[StepDocumentation("""
                   Rejects messages that have been delivered too many times.

                   If Rebus has not handled this using the normal error strategies it might be because the message is killing
                   the process preventing Rebus from handling it.
                   """)]
class TooManyDeliveryAttemptsRejector : IIncomingStep
{
    private const int MaxDeliveryAttempts = 5;

    public async Task Process(IncomingStepContext context, Func<Task> next)
    {
        var message = context.Load<TransportMessage>();

        if (message == null)
        {
            await next();
            return;
        }

        var headers = message.Headers;

        if (headers.TryGetValue(MoreRabbitMqHeader.DeliveryCount, out var deliveryCount) &&
            int.TryParse(deliveryCount, out var count))
        {
            if (count > MaxDeliveryAttempts)
            {
                throw new MaxDeliverAttemptsExceededException(
                    $"Max delivery attempts exceeded of {MaxDeliveryAttempts}. Deliver attempt count is {count}.");
            }
        }

        await next();
    }
}

class ClearDeliveryHeaderTransport : ITransport
{
    private readonly ITransport _transportImplementation;

    public ClearDeliveryHeaderTransport(ITransport transportImplementation)
    {
        _transportImplementation = transportImplementation;
    }

    public void CreateQueue(string address)
    {
        _transportImplementation.CreateQueue(address);
    }

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
    {
        message.Headers.Remove(MoreRabbitMqHeader.DeliveryCount);

        return _transportImplementation.Send(destinationAddress, message, context);
    }

    public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
    {
        return _transportImplementation.Receive(context, cancellationToken);
    }

    public string Address => _transportImplementation.Address;
}

public class MoreRabbitMqHeader
{
    /// <summary>
    /// How many times rabbitmq has attempted to deliver this message.
    /// This is only supported when working with Quorum queues.
    /// </summary>
    public const string DeliveryCount = "x-delivery-count";
}

public class MaxDeliverAttemptsExceededException : Exception, IFailFastException
{
    public MaxDeliverAttemptsExceededException(string? message) : base(message)
    {
    }
}

This has to be tested in a separate process and not a unit test to really test it, as it has to take down the entire process for Rabbit to do the incrementing.

I couldn't find a way to mutate headers when handling error messages except for wrapping the transport itself and having it clear the header. Outgoing pipeline decorations seems to be ignored when processing error messages.

It should probably also read the max attempts from the retry strategy, but i tried to keep it as simple as possible.

mookid8000 commented 6 months ago

Hi there, sorry for not replying sooner πŸ˜… It is definitely an interesting approach, but I'm thinking that maybe it could be "upgraded" a bit and formalized as a new, optional rbs2-delivery-count header, which the transport could provide when possible.

I've published Rebus 8.2.0-alpha01 to NuGet.org just now where I've added the header under Headers.DeliveryCount and added the necessary logic to DefaultRetryStep to act on it.

Only thing left (I think) is to make transports that support it set the header when they build the TransportMessage object. Unfortunately I don't have time to implement it in Rebus.RabbitMq right now, so feel free to do it πŸ˜‰ or wait for me to come around again (which I'll probably do some time this week)

zlepper commented 6 months ago

Hi there, sorry for not replying sooner πŸ˜… It is definitely an interesting approach, but I'm thinking that maybe it could be "upgraded" a bit and formalized as a new, optional rbs2-delivery-count header, which the transport could provide when possible.

I've published Rebus 8.2.0-alpha01 to NuGet.org just now where I've added the header under Headers.DeliveryCount and added the necessary logic to DefaultRetryStep to act on it.

Only thing left (I think) is to make transports that support it set the header when they build the TransportMessage object. Unfortunately I don't have time to implement it in Rebus.RabbitMq right now, so feel free to do it πŸ˜‰ or wait for me to come around again (which I'll probably do some time this week)

No problem, Christmas vacation is important!

I mostly put it here to get the ideas down, and to act as a POC. I was very sure the exact implementation was going to differ since you know the code much better than me :D

mookid8000 commented 6 months ago

Ok I just had 10 minutes to try πŸ™‚ I've added a test here: https://github.com/rebus-org/Rebus.RabbitMq/blob/master/Rebus.RabbitMq.Tests/RabbitMqNativeDeliveryCount.cs

and the implementation just provides the value from the x-delivery-count header when it is there.

Haven't had a quorum queue to test on yet, though.... feel free to take it for a spin if you like

it's out as Rebus.RabbitMq 9.1.0-alpha01 on NuGet.org now

zlepper commented 6 months ago

Just took a quick look at the implementation. It probably needs to clear the header when passing the message to the error queue, otherwise simple retrying by using the Rabbit management interface to move messages from the error queue to the work queue will cause the messages to be considered failed immediately, at least that was the result from my testing originally :)

But this is going to be lovely, much less dead processes when BAD things happens :D :D