Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
402 stars 145 forks source link

Question: how to handle redelivery #588

Closed JBastiaan closed 4 weeks ago

JBastiaan commented 3 months ago

Hello,

I'm using amqpnetlite to connect to an activemq classic broker. Im struggling how to implement redeliveries and moving it to DLQ. Specifically when an exception occurs when processing in OnMessage. A message i can choose to Reject, Release or Modify from what i can see in the API. Ive tried to reject, outcome is on activemq the message is marked as Dequeued. I've also tried to Release, i seem to get like unlimited redeliveries then, only when i quit the app the redelivery count is increased in activemq. Same for modify, i get unlimited redeliveries, delivery count keeps increasing on consumer side, but not on broker side.

Expected behavior: Redelivery is increased on activemq broker side so it should be automatically moved to a configured DLQ.

What am i missing here?

Edit: I also tried publishing the message myself to DLQ using a separate ISenderLink, however i get an exception saying: "Message was not received by this link"

My consumer code:



internal class QueueConsumer<T> : IHostedService
    where T : class
{
    private readonly ILogger<QueueConsumer<T>> _logger;
    private readonly IServiceScopeFactory _serviceScopeFactory;

    private readonly Lazy<Task<IReceiverLink>> _receiver;

    internal QueueConsumer(
        ILogger<QueueConsumer<T>> logger,
        IConnectionAdapter connectionAdapter,
        IServiceScopeFactory serviceScopeFactory,
        string queueName)
    {
        _logger = logger;
        _serviceScopeFactory = serviceScopeFactory;
        _receiver = connectionAdapter.CreateQueueReceiver(
            $"Read.{queueName}",
            $"queue://{queueName}");
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiver = await _receiver.Value;
        receiver.Start(1, OnMessage);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        var receiver = await _receiver.Value;
        await receiver.CloseAsync();
    }

    private async void OnMessage(IReceiverLink receiver, Message message)
    {
        try
        {
            using var body = new MemoryStream((message.Body as byte[])!);
            var deserializer = new XmlSerializer(typeof(T));
            var msg = deserializer.Deserialize(body) as T ??
                      throw new SerializationException(
                          $"Failed to deserialize message of type {typeof(T).FullName}");

            var scope = _serviceScopeFactory.CreateAsyncScope();
            var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
            await mediator.Publish(new ActiveMqMessage<T>
            {
                MessageProperties = message.Properties,
                Message = msg
            });

            receiver.Accept(message);
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Failed to process message of type {Type}", typeof(T).FullName);

            try
            {
                //Also tried Reject and Release here
               //Also tried the following (gives Message was not received by this link) :                
                //var dlqSender = await _dlqSender.Value;
                //await dlqSender.SendAsync(message);
                //receiver.Accept(message);

                receiver.Modify(
                    message,
                    deliveryFailed: true,
                    undeliverableHere: false,
                    new Fields());
            }
            catch (Exception ex)
            {
                //If it ends up here it might mean we failed to release the message and
                //we cant receive more messages due to our credit being exhausted
                _logger.LogCritical(ex,
                    "Failed to send modification of failed message to peer of type {Type}",
                    typeof(T).FullName);
            }
        }
    }
}```
xinchen10 commented 3 months ago

Try undeliverableHere: true in the Modify call. It worked for me. The message was moved to the ActiveMQ.DLQ queue.

Some activemq broker configurations could also affect the redelivery and deadlettering behavior. For details check out https://activemq.apache.org/components/classic/documentation/message-redelivery-and-dlq-handling

JBastiaan commented 3 months ago

Hey, thanks for your response. I figured out why the message wasn't moved to DLQ, i was not setting the Durable header to true when publishing my test messages. So now when i reject or modify a message as undeliverableHere it gets moved to DLQ.

However i dont entirely understand yet why receiver.Release(message) and receiver.Modify(message) doesn't seem to get synced to the broker. When i release i just get infinite retries with the same delivery count, and if i modify as in the snippet i posted i do get increased delivery counts but that does not seem to get synced to the broker and i basically get infinite retries aswell.

Is it expected that we should check the delivery count on consumer side and then reject or mark as undeliverable here so it moves to DLQ? I kind of would have expected the broker to move the message to DLQ after consumer side marked it as deliveryfailed for x amount of times.

Note: I'm using the out of the box activemq configuration, only thing i've modified is this:

                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="25mb" queuePrefetch="1" reduceMemoryFootprint="true">
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy limit="1000"/>
                        </pendingMessageLimitStrategy>
                        <!-- ICC standards: Message Redelivery and DLQ Handling
                            http://activemq.apache.org/message-redelivery-and-dlq-handling.html
                        -->
                        <deadLetterStrategy>
                            <!--
                            Use the suffix '.DLQ' for the destination name, and make
                            the DLQ a queue rather than a topic
                            -->
                            <individualDeadLetterStrategy topicSuffix=".DLQ" topicPrefix="" queueSuffix=".DLQ" queuePrefix="" useQueueForQueueMessages="true"/>
                        </deadLetterStrategy>
                    </policyEntry>
                </policyEntries>

//Johan

xinchen10 commented 3 months ago

Released and modified outcomes are explained in the specification (section 3.4.4 and 3.4.5). http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf.

It is expected that you get the same message repeatedly for released and modified with undeliverableHere = false.

Deadlettering based on delivery-count is broker specific. It might be better to ask the ActiveMQ community how that is handled.

By default ActiveMQ only considers persistent message for deadlettering. In my testing I added the following to the broker config.

                <policyEntry queue=">">
                  <deadLetterStrategy>
                    <sharedDeadLetterStrategy processNonPersistent="true"/>
                  </deadLetterStrategy>
                </policyEntry>

I think setting the message header has the same effect.