yiisoft / queue

Queue extension for Yii 3.0
https://www.yiiframework.com/
BSD 3-Clause "New" or "Revised" License
54 stars 28 forks source link

Supporting of M:N relation (many type of messages in many channels) required #223

Closed holdmann closed 1 week ago

holdmann commented 1 week ago

Any queue can has many channels, each of them also can has many types of messages. Current realization supports only one channel <-> one type of message relation.

Еherefore, when processing a queue messages, they are processed successfully until a message with a different type is received.

Then it throws exception:

In Worker.php line 75:

  [Yiisoft\Queue\Exception\JobFailureException]                                                                                                            
  Processing of message #null is stopped because of an exception:                                                                                          
  Argument 1 passed to Kant\PIM\Queue\Job\V2\Product::getIblockId() must be of the type int, null given, called in /docroot/local/php_interfac  
  e/classes/Kant/PIM/Queue/Job/V2/Product.php on line 29.                                                                                                  

Exception trace:
  at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Worker/Worker.php:75
 Yiisoft\Queue\Worker\Worker->process() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:176
 Yiisoft\Queue\Queue->handle() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:96
 Yiisoft\Queue\Queue->Yiisoft\Queue\{closure}() at /docroot/local/modules/artofbx.queue/lib/expansion/yiisoft/queue/bitrixadapter.php:272
 ArtOfBx\Queue\Expansion\Yiisoft\Queue\BitrixAdapter->run() at /docroot/local/modules/artofbx.queue/lib/expansion/yiisoft/queue/bitrixadapter.php:66
 ArtOfBx\Queue\Expansion\Yiisoft\Queue\BitrixAdapter->runExisting() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:105
 Yiisoft\Queue\Queue->run() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Command/RunCommand.php:54
 Yiisoft\Queue\Command\RunCommand->execute() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Command/Command.php:298
 Symfony\Component\Console\Command\Command->run() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:1040
 Symfony\Component\Console\Application->doRunCommand() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:301
 Symfony\Component\Console\Application->doRun() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:171
 Symfony\Component\Console\Application->run() at /docroot/local/modules/artofbx.queue/tools/console.php:87

In Product.php line 78:

  [TypeError]                                                                                                                                              
  Argument 1 passed to Kant\PIM\Queue\Job\V2\Product::getIblockId() must be of the type int, null given, called in /docroot/local/php_interfac  
  e/classes/Kant/PIM/Queue/Job/V2/Product.php on line 29                                                                                                   

Exception trace:
  at /docroot/local/php_interface/classes/Kant/PIM/Queue/Job/V2/Product.php:78
 Kant\PIM\Queue\Job\V2\Product->getIblockId() at /docroot/local/php_interface/classes/Kant/PIM/Queue/Job/V2/Product.php:29
 Kant\PIM\Queue\Job\V2\Product->execute() at n/a:n/a
 ReflectionFunction->invokeArgs() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/injector/src/Injector.php:86
 Yiisoft\Injector\Injector->invoke() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Worker/Worker.php:63
 Yiisoft\Queue\Worker\Worker->Yiisoft\Queue\Worker\{closure}() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Middleware/Consume/ConsumeFinalHandler.php:23
 Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler->handleConsume() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Middleware/Consume/MiddlewareConsumeStack.php:44
 Yiisoft\Queue\Middleware\Consume\MiddlewareConsumeStack->handleConsume() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php:46
 Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher->dispatch() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Worker/Worker.php:65
 Yiisoft\Queue\Worker\Worker->process() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:176
 Yiisoft\Queue\Queue->handle() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:96
 Yiisoft\Queue\Queue->Yiisoft\Queue\{closure}() at /docroot/local/modules/artofbx.queue/lib/expansion/yiisoft/queue/bitrixadapter.php:272
 ArtOfBx\Queue\Expansion\Yiisoft\Queue\BitrixAdapter->run() at /docroot/local/modules/artofbx.queue/lib/expansion/yiisoft/queue/bitrixadapter.php:66
 ArtOfBx\Queue\Expansion\Yiisoft\Queue\BitrixAdapter->runExisting() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Queue.php:105
 Yiisoft\Queue\Queue->run() at /docroot/local/modules/artofbx.queue/vendor/yiisoft/queue/src/Command/RunCommand.php:54
 Yiisoft\Queue\Command\RunCommand->execute() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Command/Command.php:298
 Symfony\Component\Console\Command\Command->run() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:1040
 Symfony\Component\Console\Application->doRunCommand() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:301
 Symfony\Component\Console\Application->doRun() at /docroot/local/modules/artofbx.queue/vendor/symfony/console/Application.php:171
 Symfony\Component\Console\Application->run() at /docroot/local/modules/artofbx.queue/tools/console.php:87

queue:run [-m|--maximum MAXIMUM] [--] [<channel>...]

The main problem is here:

src/Middleware/Consume/ConsumeMiddlewareDispatcher.php on line 43

    public function dispatch(
        ConsumeRequest $request,
        MessageHandlerConsumeInterface $finishHandler
    ): ConsumeRequest {
        if ($this->stack === null) {
            $this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler);
        }

        return $this->stack->handleConsume($request);
    }

As you can see stack be only null|MiddlewareConsumeStack, instead of it should be MiddlewareConsumeStack[].

Additional info

Q A
Version latest
PHP version 8.1+
Operating system any
holdmann commented 1 week ago

Can be closed, PR added - https://github.com/yiisoft/queue/pull/224