sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
191 stars 55 forks source link

Need persistent messages with RabbitMQ #52

Closed tkocjan closed 5 years ago

tkocjan commented 5 years ago

I do not see a way to have persistent messages with RabbitMQ. This requires instantiating the following AMQPMessage:

new \PhpAmqpLib\Message\AMQPMessage($body, ['delivery_mode' => 2]);

I cannot see a way to make that happen with existing code.

If I were to implement this, it seems like adding another option to QueueInteropTransport would be the way to do this. Usually, if you are going to persist messages you are going to persist all the messages in a queue and not just selective ones.

tkocjan commented 5 years ago

I have a fix that allows a deliveryMode param in the DSN. That is then used in the QueueInteropTransport::send() method to do $interopMessage->setDeliveryMode() if the message is an instance of AmqpMessage. The code for QueueInteropTransport is below.

Rather than do it this way, it might be better to add this functionality to AmqpProducer like setPriority() does but that is a lot more work and also requires changes to php-enqueue/enqueue-dev (\Enqueue\AmqpLib\AmqpProducer) and queue-interop/amqp-interop (\Interop\Amqp\AmqpProducer). I did not implement this without first finding out if this way is more appropriate plus not knowing how difficult it will be to get all the changes into three repos.

Here is QueueInteropTransport with deliveryMode functionality:

<?php

/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <fabien@symfony.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Enqueue\MessengerAdapter;

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\MessengerAdapter\Exception\RejectMessageException;
use Enqueue\MessengerAdapter\Exception\RequeueMessageException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Interop\Queue\Exception as InteropQueueException;
use Interop\Queue\Message;
use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
use Symfony\Component\OptionsResolver\Options;
use Symfony\Component\OptionsResolver\OptionsResolver;
use \Interop\Amqp\AmqpMessage;

/**
 * Symfony Messenger transport.
 *
 * @author Samuel Roze <samuel.roze@gmail.com>
 * @author Max Kotliar <kotlyar.maksim@gmail.com>
 */
class QueueInteropTransport implements TransportInterface
{
    private $serializer;
    private $contextManager;
    private $options;
    private $debug;
    private $shouldStop;

    public function __construct(
        SerializerInterface $serializer,
        ContextManager $contextManager,
        array $options = array(),
        $debug = false
    ) {
        $this->serializer = $serializer;
        $this->contextManager = $contextManager;
        $this->debug = $debug;

        $resolver = new OptionsResolver();
        $this->configureOptions($resolver);
        $this->options = $resolver->resolve($options);
    }

    /**
     * {@inheritdoc}
     */
    public function receive(callable $handler): void
    {
        $context = $this->contextManager->context();
        $destination = $this->getDestination(null);
        $queue = $context->createQueue($destination['queue']);
        $consumer = $context->createConsumer($queue);

        if ($this->debug) {
            $this->contextManager->ensureExists($destination);
        }

        while (!$this->shouldStop) {
            try {
                if (null === ($interopMessage = $consumer->receive($this->options['receiveTimeout'] ?? 30000))) {
                    $handler(null);
                    continue;
                }
            } catch (\Exception $e) {
                if ($this->contextManager->recoverException($e, $destination)) {
                    continue;
                }

                throw $e;
            }

            try {
                $handler($this->serializer->decode(array(
                    'body' => $interopMessage->getBody(),
                    'headers' => $interopMessage->getHeaders(),
                    'properties' => $interopMessage->getProperties(),
                )));

                $consumer->acknowledge($interopMessage);
            } catch (RejectMessageException $e) {
                $consumer->reject($interopMessage);
            } catch (RequeueMessageException $e) {
                $consumer->reject($interopMessage, true);
            }
        }
    }

    /**
     * {@inheritdoc}
     */
    public function send(Envelope $envelope): Envelope
    {
        $context = $this->contextManager->context();
        $destination = $this->getDestination($envelope);
        $topic = $context->createTopic($destination['topic']);

        if ($this->debug) {
            $this->contextManager->ensureExists($destination);
        }

        $encodedMessage = $this->serializer->encode($envelope);

        $interopMessage = $context->createMessage(
            $encodedMessage['body'],
            $encodedMessage['properties'] ?? array(),
            $encodedMessage['headers'] ?? array()
        );

        $this->setMessageMetadata($interopMessage, $envelope);

        $producer = $context->createProducer();

        if (isset($this->options['deliveryDelay'])) {
            if ($producer instanceof DelayStrategyAware) {
                $producer->setDelayStrategy($this->options['delayStrategy']);
            }
            $producer->setDeliveryDelay($this->options['deliveryDelay']);
        }
        if (isset($this->options['priority'])) {
            $producer->setPriority($this->options['priority']);
        }
        if (isset($this->options['timeToLive'])) {
            $producer->setTimeToLive($this->options['timeToLive']);
        }

        if (   $interopMessage instanceof AmqpMessage
            && isset($this->options['deliveryMode'])
        ) {
            $interopMessage->setDeliveryMode($this->options['deliveryMode']);
        }

        try {
            $producer->send($topic, $interopMessage);
        } catch (InteropQueueException $e) {
            if (!$this->contextManager->recoverException($e, $destination)) {
                throw new SendingMessageFailedException($e->getMessage(), null, $e);
            }

            // The context manager recovered the exception, we re-try.
            $envelope = $this->send($envelope);
        }

        return $envelope;
    }

    /**
     * {@inheritdoc}
     */
    public function stop(): void
    {
        $this->shouldStop = true;
    }

    public function configureOptions(OptionsResolver $resolver): void
    {
        $resolver->setDefaults(array(
            'receiveTimeout' => null,
            'deliveryDelay' => null,
            'delayStrategy' => RabbitMqDelayPluginDelayStrategy::class,
            'priority' => null,
            'timeToLive' => null,
            'deliveryMode' => null,
            'topic' => array('name' => 'messages'),
            'queue' => array('name' => 'messages'),
        ));

        $resolver->setAllowedTypes('receiveTimeout', array('null', 'int'));
        $resolver->setAllowedTypes('deliveryDelay', array('null', 'int'));
        $resolver->setAllowedTypes('priority', array('null', 'int'));
        $resolver->setAllowedTypes('timeToLive', array('null', 'int'));
        $resolver->setAllowedTypes('deliveryMode', array('null', 'int'));
        $resolver->setAllowedTypes('delayStrategy', array('null', 'string'));

        $resolver->setAllowedValues('delayStrategy', array(
                null,
                RabbitMqDelayPluginDelayStrategy::class,
                RabbitMqDlxDelayStrategy::class,
            )
        );

        $resolver->setNormalizer('delayStrategy', function (Options $options, $value) {
            return null !== $value ? new $value() : null;
        });
    }

    private function getDestination(?Envelope $envelope): array
    {
        $configuration = $envelope ? $envelope->last(TransportConfiguration::class) : null;
        $topic = null !== $configuration ? $configuration->getTopic() : null;

        return array(
            'topic' => $topic ?? $this->options['topic']['name'],
            'topicOptions' => $this->options['topic'],
            'queue' => $this->options['queue']['name'],
            'queueOptions' => $this->options['queue'],
        );
    }

    private function setMessageMetadata(Message $interopMessage, Envelope $envelope): void
    {
        $configuration = $envelope->last(TransportConfiguration::class);

        if (null === $configuration) {
            return;
        }

        $metadata = $configuration->getMetadata();
        $class = new \ReflectionClass($interopMessage);

        foreach ($metadata as $key => $value) {
            $setter = sprintf('set%s', ucfirst($key));
            if (!$class->hasMethod($setter)) {
                throw new MissingMessageMetadataSetterException($key, $setter, $class->getName());
            }
            $interopMessage->{$setter}($value);
        }
    }
}
Steveb-p commented 5 years ago

@tkocjan could you provide a Pull Request so we can see clearly what the changes are?

tkocjan commented 5 years ago

I cloned. Created branch. Made changes. Committed.

Tried to push, got error: Permission to php-enqueue/messenger-adapter.git denied to tkocjan.

I assume I need permission to push, correct?

Can I get permission?

tkocjan commented 5 years ago

Nevermind last comment, I cloned the repo and made the changes and the PR is now in php-enqueue/messenger-adapter.