sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
191 stars 55 forks source link

TransportConfiguration::setDeliveryDelay is not working #66

Open malarzm opened 5 years ago

malarzm commented 5 years ago

Expected result: deliveryDelay from TransportConfiguration is used over deliveryDelay specified in transport's options.

Actual result:

Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException: Missing "setDeliveryDelay" setter for "deliveryDelay" metadata key in "Double\Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage\P1"

Sorry for not providing a PR, this test can be pasted directly to QueueInteropTransportTest

    public function testDeliveryDetail()
    {
        $topicName = 'topic';
        $queueName = 'queue';
        $message = new \stdClass();
        $message->foo = 'bar';
        $envelope = (new Envelope($message))->with(new TransportConfiguration(array(
            'metadata' => array('routingKey' => 'foo.bar', 'deliveryDelay' => 1000),
        )));

        $psrMessageProphecy = $this->prophesize(DecoratedPsrMessage::class);
        $psrMessageProphecy->setRoutingKey('foo.bar')->shouldBeCalled();
        $psrMessage = $psrMessageProphecy->reveal();
        $topicProphecy = $this->prophesize(Topic::class);
        $topic = $topicProphecy->reveal();

        $producerProphecy = $this->prophesize(Producer::class);
        $producerProphecy->send($topic, $psrMessage)->shouldBeCalled();

        $contextProphecy = $this->prophesize(Context::class);
        $contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic);
        $contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal());
        $contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage);

        $contextManagerProphecy = $this->prophesize(ContextManager::class);
        $contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal());
        $contextManagerProphecy->ensureExists(array(
            'topic' => $topicName,
            'topicOptions' => array('name' => $topicName, 'foo' => 'bar'),
            'queue' => $queueName,
            'queueOptions' => array('name' => $queueName, 'bar' => 'foo'),
        ))->shouldBeCalled();

        $encoderProphecy = $this->prophesize(SerializerInterface::class);
        $encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo'));

        $transport = $this->getTransport(
            $encoderProphecy->reveal(),
            $contextManagerProphecy->reveal(),
            array(
                'topic' => array('name' => $topicName, 'foo' => 'bar'),
                'queue' => array('name' => $queueName, 'bar' => 'foo'),
            ),
            true
        );

        $transport->send($envelope);
    }
kor3k commented 5 years ago

when trying

$envelope = $envelope->with((new TransportConfiguration())->setDeliveryDelay(5000));

got this

Missing "setDeliveryDelay" setter for "deliveryDelay" metadata key in "Interop\Amqp\Impl\AmqpMessage" class

tkocjan commented 5 years ago

It does not seem like there should be the setDeliveryDelay, setDelayStrategy and setTimeToLive methods in the TransportConfiguration class. Those methods set metadata that QueueInteropTransport::setMessageMetadata uses to set methods on \Interop\Amqp\AmqpMessage or \Interop\Queue\Message which do not exist. The TransportConfiguration::setPriority method will also only work on AmqpMessage.

It seems like 'deliveryDelay', 'delayStrategy', 'priority' and 'timeToLive' should only be set as options to QueueInteropTransport which are all supported.

The proper way to only delay specific messages would be to add a \Symfony\Component\Messenger\Stamp\DelayStamp to the Envelope but, unfortunately, that is not working on QueueInteropTransport right now:

$envelope = $envelope->with(new DelayStamp(5000));

Should setDeliveryDelay, setDelayStrategy, setTimeToLive and setPriority be dropped from TransportConfiguration?