symfony / symfony

The Symfony PHP framework
https://symfony.com
MIT License
29.77k stars 9.46k forks source link

[Messenger] RejectRedeliveredMessageMiddleware causes duplicate messages in failure transport #57733

Open vkollin opened 3 months ago

vkollin commented 3 months ago

Symfony version(s) affected

6.4

Description

Setup rabbitMQ transport: max_retries: 0 failure_transport via doctrine

We discovered some strange behavior of our async transport running with rabbitMQ. If an async message fails it is not be retried but is send to the failure transport as expected.

But in some cases the message is redelivered by rabbitMQ and RejectRedeliveredMessageMiddleware throws the RejectRedeliveredMessageException.

class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
        if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
            throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

The Worker rejects the message as expected, and dispatches an WorkerMessageFailedEvent which will trigger SendFailedMessageToFailureTransportListener in the end.

// from Worker::ack()

if (null !== $e) {
    if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
        // redelivered messages are rejected first so that continuous failures in an event listener or while
        // publishing for retry does not cause infinite redelivery loops
        $receiver->reject($envelope);
    }

    // ...
}

$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->eventDispatcher?->dispatch($handledEvent);
$envelope = $handledEvent->getEnvelope();

Inside SendFailedMessageToFailureTransportListener there is nothing that checks if the RejectRedeliveredMessageException was thrown.

if ($event->willRetry()) {
    return;
}

if (!$this->failureSenders->has($event->getReceiverName())) {
    return;
}

$failureSender = $this->failureSenders->get($event->getReceiverName());

$envelope = $event->getEnvelope();

// avoid re-sending to the failed sender
if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
    return;
}

Im not quite sure if this is the expected behavior because currently it duplicates messages by sending both to the failure transport

How to reproduce

Message Handler throws an exception. Sometimes the message is unexpectedly redelivered and is handled by RejectRedeliveredMessageMiddleware

#[AsMessageHandler(fromTransport: 'rabbit_mq')]
class MyMessageHandler
{
    public function __invoke(CustomMessage $message): void
    {
        throw new \RuntimeException();
    }
}

Possible Solution

SendFailedMessageToFailureTransportListener checks the WorkerMessageFailedEvent if an RejectRedeliveredMessageException was thrown and does nothing if so (if it is the expected behavior)

Additional Context

No response

vkollin commented 3 months ago

There are also this issues/PRs

@nikophil So maybe the solution would be to make it somehow configurable if RejectRedeliveredMessageException is unrecoverable or not, because it might be depend on the environment/architecture of the application what is the expected behavior.