zenstruck / messenger-test

Assertions and helpers for testing your symfony/messenger queues.
MIT License
225 stars 15 forks source link

Message Dispatching/Processing in Handlers #74

Closed rodnaph closed 6 months ago

rodnaph commented 7 months ago

When processing messages on the test transport available/new messages are processed immediately. I'm running into an issue where I have a Doctrine postFlush listener which dispatches a message to the bus, and as the resultant message is processed right away it's causing UnitOfWork to be re-entered resulting in an exception (because this is not expected by Doctrine - "Making changes to entities and calling EntityManager::flush() from within event handlers dispatched by EntityManager::flush() itself is strongly discouraged, and might be deprecated and eventually prevented in the future.").

Here is a sample callstack...

/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/ORMInvalidArgumentException.php:102
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php:3619
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php:412
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php:403
/srv/app/vendor/symfony/doctrine-bridge/Messenger/DoctrineTransactionMiddleware.php:32
/srv/app/vendor/symfony/doctrine-bridge/Messenger/AbstractDoctrineMiddleware.php:45
/srv/app/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php:34
/srv/app/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php:68
/srv/app/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php:41
/srv/app/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php:37
/srv/app/vendor/symfony/messenger/MessageBus.php:70
/srv/app/vendor/symfony/messenger/TraceableMessageBus.php:38
/srv/app/vendor/zenstruck/messenger-test/src/Bus/TestBus.php:38
/srv/app/src/App/MyListener.php:65 ### postFlush listener called here <-----------------------------------
/srv/app/vendor/symfony/doctrine-bridge/ContainerAwareEventManager.php:63
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php:3580
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php:500
/srv/app/vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php:403
/srv/app/vendor/symfony/doctrine-bridge/Messenger/DoctrineTransactionMiddleware.php:32
/srv/app/vendor/symfony/doctrine-bridge/Messenger/AbstractDoctrineMiddleware.php:45
/srv/app/vendor/symfony/doctrine-bridge/Messenger/DoctrineCloseConnectionMiddleware.php:31
/srv/app/vendor/symfony/doctrine-bridge/Messenger/AbstractDoctrineMiddleware.php:45
/srv/app/vendor/symfony/doctrine-bridge/Messenger/DoctrinePingConnectionMiddleware.php:34
/srv/app/vendor/symfony/doctrine-bridge/Messenger/AbstractDoctrineMiddleware.php:45
/srv/app/vendor/symfony/messenger/MessageBus.php:70
/srv/app/vendor/symfony/messenger/TraceableMessageBus.php:38
/srv/app/vendor/zenstruck/messenger-test/src/Bus/TestBus.php:38
/srv/app/tests/functional/MyTest.php:369

I tried calling process(1), thinking it would stop the message dispatched by the listener being processed, but the StopWorkerOnMessageLimitListener doesn't seem to have been called to increase the message count as the first message is still processing.

I don't really want to disable my listener, as this is a functional test and I want to assert/exercise it's functionality. I'm not sure what expected behaviour is here, if there are any issues, but just wondering if you have any advice around it.

Any advice appreciated, thanks.

nikophil commented 6 months ago

Hello,

by default, test transports "intercept" messages, and do not dispatch them until process() is called. This can be disabled in the config: https://github.com/zenstruck/messenger-test/?tab=readme-ov-file#unblock-mode

don't you have this special config ?intercept=false added to your test transport?

rodnaph commented 6 months ago

by default, test transports "intercept" messages, and do not dispatch them until process() is called. This can be disabled in the config: https://github.com/zenstruck/messenger-test/?tab=readme-ov-file#unblock-mode

That's correct, my transport is set to intercept messages. So...

  1. Message sent to transport
  2. TestTransport intercepts it and does nothing
  3. process(1) called to process a single message
  4. During processing that message, it sends a second message
  5. I think this is where my understanding breaks down, and it's actually the dispatching of the message, not the processing which I'm confused about.

Anyway, I think my understanding of the architecture of dispatching and sending to transports perhaps isn't good enough to explain the issue, I'll close this but thanks for your time.

nikophil commented 6 months ago

Hey @rodnaph

I really don't understand how this could happend, unless the transport used by the listener is not the same than the one which handles the original message.

If you can provide a reproducer, I may help you find the problem