php-enqueue / enqueue-dev

Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
https://enqueue.forma-pro.com/
MIT License
2.18k stars 433 forks source link

Processor was not found. processorName: "Enqueue\Consumption\CallbackProcessor5c9cae7d01f6a" #807

Closed pimjansen closed 5 years ago

pimjansen commented 5 years ago

Hi,

Im trying out enqueue-dev with a basic setup within laravel. All works fine however i would like to decouple it from the framework and use the provided client instead. Main reason is that i want plain json in my queues to also consume in other languages.

Im now pushing messages on the queue with below:

$client->setupBroker();
$client->sendEvent('enqueue_test', json_encode($payload));

All works fine and in the end i indeed have a few messages on my queue. I also registered a simple listener like below which works fine:

$this->app->resolving(SimpleClient::class, function (SimpleClient $client, $app) {
    $client->bindTopic('enqueue_test', function(Message $message) {
        echo 'Well hi there'.PHP_EOL;
        // return Processor::ACK;
    });

    return $client;
});

The only problem hits that my consumer in this case is not returning a ACK. When start the consumer php artisan enqueue:consume --setup-broker -vvv initially it runs fine but when i try to run it again i get the error mentioned in the title. I do expect it to be possible though since technically my consumer crashes at this point by not returning an ACK and the message stays on the queue itself.

The full stack

/app # php artisan enqueue:consume --setup-broker -vvv
[debug] Change logger from "Psr\Log\NullLogger" to "Symfony\Component\Console\Logger\ConsoleLogger"
[debug] Consumption has started
[debug] [AmqpDriver] Declare router exchange: enqueue.default
[debug] [AmqpDriver] Declare router queue: enqueue.app.default
[debug] [AmqpDriver] Bind router queue to exchange: enqueue.app.default -> enqueue.default
[debug] [AmqpDriver] Declare processor queue: enqueue.app.default
[debug] Received from enqueue.app.default       {"id":"beb95f32-514b-11e9-9d08-0242ac120005","type":"email","recipients":{"to":["myemail@gmail.com","my.email@gmail.com"],"cc":["myemail@gmail.com","my.email@gmail.com"],"bcc":["myemail@gmail.com","my.email@gmai
l.com"]},"body":"MyEmailKey","data":{"key":"value","foo":"bar"}}

In ArrayProcessorRegistry.php line 33:

  [LogicException]
  Processor was not found. processorName: "Enqueue\Consumption\CallbackProcessor5c9cae7d01f6a"

Exception trace:
 () at /app/vendor/enqueue/enqueue/ArrayProcessorRegistry.php:33
 Enqueue\ArrayProcessorRegistry->get() at /app/vendor/enqueue/enqueue/Client/DelegateProcessor.php:38
 Enqueue\Client\DelegateProcessor->process() at /app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:197
 Enqueue\Consumption\QueueConsumer->Enqueue\Consumption\{closure}() at n/a:n/a
 call_user_func() at /app/vendor/enqueue/amqp-lib/AmqpSubscriptionConsumer.php:112
 Enqueue\AmqpLib\AmqpSubscriptionConsumer->Enqueue\AmqpLib\{closure}() at n/a:n/a
 call_user_func() at /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:997
 PhpAmqpLib\Channel\AMQPChannel->basic_deliver() at n/a:n/a
 call_user_func() at /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:212
 PhpAmqpLib\Channel\AbstractChannel->dispatch() at /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:351
 PhpAmqpLib\Channel\AbstractChannel->wait() at /app/vendor/enqueue/amqp-lib/AmqpSubscriptionConsumer.php:60
 Enqueue\AmqpLib\AmqpSubscriptionConsumer->consume() at /app/vendor/enqueue/enqueue/Consumption/QueueConsumer.php:264
 Enqueue\Consumption\QueueConsumer->consume() at /app/vendor/enqueue/enqueue/Symfony/Client/ConsumeCommand.php:150
 Enqueue\Symfony\Client\ConsumeCommand->execute() at /app/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /app/vendor/symfony/console/Application.php:908
 Symfony\Component\Console\Application->doRunCommand() at /app/vendor/symfony/console/Application.php:269
 Symfony\Component\Console\Application->doRun() at /app/vendor/symfony/console/Application.php:145
 Symfony\Component\Console\Application->run() at /app/vendor/laravel/framework/src/Illuminate/Console/Application.php:90
 Illuminate\Console\Application->run() at /app/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php:122
 Illuminate\Foundation\Console\Kernel->handle() at /app/artisan:37
pimjansen commented 5 years ago

I noticed that it also happens on the REENQUEUE return type. Works well for the ACK and REJECT

dmlang commented 5 years ago

I get the same exception after few minutes with no messages, even with ACK.

`LogicException: Processor was not found. processorName: "Enqueue\Consumption\CallbackProcessor5cdd92fa1c131" in /var/www/html/protected/vendors/enqueue/enqueue/ArrayProcessorRegistry.php:33 Stack trace:

0 /var/www/html/protected/vendors/enqueue/enqueue/Client/DelegateProcessor.php(38): Enqueue\ArrayProcessorRegistry->get('Enqueue\Consump...')

1 /var/www/html/protected/vendors/enqueue/enqueue/Consumption/QueueConsumer.php(197): Enqueue\Client\DelegateProcessor->process(Object(Enqueue\Redis\RedisMessage), Object(Enqueue\Redis\RedisContext))

2 [internal function]: Enqueue\Consumption\QueueConsumer->Enqueue\Consumption{closure}(Object(Enqueue\Redis\RedisMessage), Object(Enqueue\Redis\RedisConsumer))

3 /var/www/html/protected/vendors/enqueue/redis/RedisSubscriptionConsumer.php(75): call_user_func(Object(Closure), Object(Enqueue\Redis\RedisMessage), Object(Enqueue\Redis\RedisConsumer))

4 /var/www/html/protected/vendors/enqueue/enqueue/Consumption/QueueConsumer.php(264): Enqueue\Redis\RedisSubscriptionConsumer->consume(10)

5 /var/www/html/protected/vendors/enqueue/simple-client/SimpleClient.php(209): Enqueue\Consumption\QueueConsumer->consume(NULL)

6 /var/www/html/protected/commands/EventConsumerCommand.php(41): Enqueue\SimpleClient\SimpleClient->consume()

7 /var/www/html/protected/vendors/yiisoft/yii/framework/console/CConsoleCommandRunner.php(71): EventConsumerCommand->run(Array)

8 /var/www/html/protected/vendors/yiisoft/yii/framework/console/CConsoleApplication.php(92): CConsoleCommandRunner->run(Array)

9 /var/www/html/protected/vendors/yiisoft/yii/framework/yiilite.php(1236): CConsoleApplication->processRequest()

10 /var/www/html/protected/commands/consoleApp.php(7): CApplication->run()

11 /var/www/html/protected/eventConsumer(4): require_once('/var/www/html/p...')

12 {main}#`

dmlang commented 5 years ago

don't bother with my problem, I'm not going to use enqueue ... just not ready/stable jet. You should definitely improve your documentation ... "basic" would be euphemism.

akrz commented 5 years ago

Same problem as @dmlang , It seems that it'll randomly throw that error after a while.

I'm using the redis client

EDIT: Seems like it works fine when using amqp-ext

makasim commented 5 years ago

possibly a bug. needs PR

akrz commented 5 years ago

This is a bug but you close it?

makasim commented 5 years ago

It is stale. Nobody works on it. What is the reason keeping it open? What for?

akrz commented 5 years ago

To keep bugs and limitations of the package in plain sight. Nobody looks at CLOSED issues when reviewing a github repo.

makasim commented 5 years ago

Then the tracker is overflowed with open issues making it really hard to find relative stuff. I am fine keeping them open for some time or if some work is in progress.

guillaumemaka commented 5 years ago

I have the same issue

timo002 commented 4 years ago

Having the same issue too!

dukadin commented 4 years ago

There is a 3rd parameter $processorName on bindCommand method. Use it to avoid hashed processor's name.

dahican commented 2 years ago

There is a 3rd parameter $processorName on bindCommand method.

Adding php-enqueue to slim, this error came up - it's adding a suffix on the class name and I am not quite sure what it is for. Adding the class name as the processor fixes the problem.

yipwt79 commented 2 years ago

I've been having this problem, and couldn't figure out why the processor suffix keeps coming up, even after following the above solutions.

Finally I went into my rabbitmqctl and purge all queues, and it works !

I guessed sth might have went wrong with my rabbitmq from previous testing.