Closed shieldz80 closed 3 months ago
Hey @shieldz80 👋
I didn't see a requeue_on_fail
option set to true in your configuration (in the discussion). Could you please retest with that option?
Hi, @rustatian,
The RR config
version: '3'
amqp:
addr: amqp://admin:123@rabbitmq:5672
rpc:
listen: tcp://127.0.0.1:6001
server:
command: php consumer.php
relay: pipes
jobs:
pool:
num_workers: ${RR_WORKERS:-4}
max_jobs: 10
consume: [ "q1", "q2", "q3", "q4" ]
pipelines:
q1:
driver: amqp
config:
prefetch: ${RR_Q1_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q1-rk
queue: q1
durable: true
requeue_on_fail: true
q2:
driver: amqp
config:
prefetch: ${RR_Q2_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q2-rk
queue: q2
durable: true
requeue_on_fail: true
q3:
driver: amqp
config:
prefetch: ${RR_Q3_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q3-rk
queue: q3
durable: true
requeue_on_fail: true
q4:
driver: amqp
config:
prefetch: ${RR_Q4_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q4-rk
queue: q4
durable: true
requeue_on_fail: true
RR_WORKERS=4 RR_Q1_PREFETCH=1 RR_Q2_PREFETCH=1 RR_Q3_PREFETCH=1 RR_Q4_PREFETCH=1
Processing log for one of the queues
[2024-06-12T03:46:49.356968+00:00] q1.INFO: Processing message with id 1 from queue q1 {"id":1,"queue":"q1"} []
[2024-06-12T03:46:53.358140+00:00] q1.INFO: Message with id 1 from queue q1 was processed successfully {"id":1,"queue":"q1"} []
[2024-06-12T03:46:53.361934+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-12T03:46:53.362141+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(3) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-12T03:46:53.363823+00:00] q1.INFO: Processing message with id 3 from queue q1 {"id":3,"queue":"q1"} []
[2024-06-12T03:46:57.364105+00:00] q1.INFO: Message with id 3 from queue q1 was processed successfully {"id":3,"queue":"q1"} []
[2024-06-12T03:46:57.364897+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-12T03:46:57.364948+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(5) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-12T03:46:57.365597+00:00] q1.INFO: Processing message with id 5 from queue q1 {"id":5,"queue":"q1"} []
[2024-06-12T03:46:59.365825+00:00] q1.INFO: Message with id 5 from queue q1 was processed successfully {"id":5,"queue":"q1"} []
[2024-06-12T03:46:59.367722+00:00] q1.INFO: Processing message with id 6 from queue q1 {"id":6,"queue":"q1"} []
[2024-06-12T03:47:02.368020+00:00] q1.INFO: Message with id 6 from queue q1 was processed successfully {"id":6,"queue":"q1"} []
[2024-06-12T03:47:02.368880+00:00] q1.INFO: Processing message with id 7 from queue q1 {"id":7,"queue":"q1"} []
[2024-06-12T03:47:06.369010+00:00] q1.INFO: Message with id 7 from queue q1 was processed successfully {"id":7,"queue":"q1"} []
[2024-06-12T03:47:06.369764+00:00] q1.INFO: Processing message with id 8 from queue q1 {"id":8,"queue":"q1"} []
[2024-06-12T03:47:09.369919+00:00] q1.INFO: Message with id 8 from queue q1 was processed successfully {"id":8,"queue":"q1"} []
[2024-06-12T03:47:09.371341+00:00] q1.INFO: Processing message with id 9 from queue q1 {"id":9,"queue":"q1"} []
[2024-06-12T03:47:14.371596+00:00] q1.INFO: Message with id 9 from queue q1 was processed successfully {"id":9,"queue":"q1"} []
[2024-06-12T03:47:14.373173+00:00] q1.INFO: Processing message with id 10 from queue q1 {"id":10,"queue":"q1"} []
[2024-06-12T03:47:15.373583+00:00] q1.INFO: Message with id 10 from queue q1 was processed successfully {"id":10,"queue":"q1"} []
[2024-06-12T03:47:15.375500+00:00] q1.INFO: Processing message with id 11 from queue q1 {"id":11,"queue":"q1"} []
[2024-06-12T03:47:19.375756+00:00] q1.INFO: Message with id 11 from queue q1 was processed successfully {"id":11,"queue":"q1"} []
[2024-06-12T03:47:19.390277+00:00] q1.INFO: Processing message with id 12 from queue q1 {"id":12,"queue":"q1"} []
[2024-06-12T03:47:21.391060+00:00] q1.INFO: Message with id 12 from queue q1 was processed successfully {"id":12,"queue":"q1"} []
[2024-06-12T03:47:21.393342+00:00] q1.INFO: Processing message with id 13 from queue q1 {"id":13,"queue":"q1"} []
[2024-06-12T03:47:23.393757+00:00] q1.INFO: Message with id 13 from queue q1 was processed successfully {"id":13,"queue":"q1"} []
[2024-06-12T03:47:23.395396+00:00] q1.INFO: Processing message with id 14 from queue q1 {"id":14,"queue":"q1"} []
[2024-06-12T03:47:27.395655+00:00] q1.INFO: Message with id 14 from queue q1 was processed successfully {"id":14,"queue":"q1"} []
[2024-06-12T03:47:27.397606+00:00] q1.INFO: Processing message with id 15 from queue q1 {"id":15,"queue":"q1"} []
[2024-06-12T03:47:29.397886+00:00] q1.INFO: Message with id 15 from queue q1 was processed successfully {"id":15,"queue":"q1"} []
[2024-06-12T03:47:29.399989+00:00] q1.INFO: Processing message with id 16 from queue q1 {"id":16,"queue":"q1"} []
[2024-06-12T03:47:32.400372+00:00] q1.INFO: Message with id 16 from queue q1 was processed successfully {"id":16,"queue":"q1"} []
[2024-06-12T03:47:32.402220+00:00] q1.INFO: Processing message with id 17 from queue q1 {"id":17,"queue":"q1"} []
[2024-06-12T03:47:35.402455+00:00] q1.INFO: Message with id 17 from queue q1 was processed successfully {"id":17,"queue":"q1"} []
[2024-06-12T03:47:35.403190+00:00] q1.INFO: Processing message with id 18 from queue q1 {"id":18,"queue":"q1"} []
[2024-06-12T03:47:37.403457+00:00] q1.INFO: Message with id 18 from queue q1 was processed successfully {"id":18,"queue":"q1"} []
[2024-06-12T03:47:37.405668+00:00] q1.INFO: Processing message with id 19 from queue q1 {"id":19,"queue":"q1"} []
[2024-06-12T03:47:41.405954+00:00] q1.INFO: Message with id 19 from queue q1 was processed successfully {"id":19,"queue":"q1"} []
[2024-06-12T03:47:41.407684+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-12T03:47:42.407894+00:00] q1.INFO: Message with id 20 from queue q1 was processed successfully {"id":20,"queue":"q1"} []
[2024-06-12T03:47:42.409138+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-12T03:47:43.409408+00:00] q1.INFO: Message with id 21 from queue q1 was processed successfully {"id":21,"queue":"q1"} []
[2024-06-12T03:47:43.428206+00:00] q1.INFO: Processing message with id 22 from queue q1 {"id":22,"queue":"q1"} []
[2024-06-12T03:47:47.428758+00:00] q1.INFO: Message with id 22 from queue q1 was processed successfully {"id":22,"queue":"q1"} []
[2024-06-12T03:47:47.440209+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-12T03:47:49.440940+00:00] q1.INFO: Message with id 23 from queue q1 was processed successfully {"id":23,"queue":"q1"} []
[2024-06-12T03:47:49.442779+00:00] q1.INFO: Processing message with id 24 from queue q1 {"id":24,"queue":"q1"} []
[2024-06-12T03:47:50.443154+00:00] q1.INFO: Message with id 24 from queue q1 was processed successfully {"id":24,"queue":"q1"} []
[2024-06-12T03:47:50.446913+00:00] q1.INFO: Processing message with id 25 from queue q1 {"id":25,"queue":"q1"} []
[2024-06-12T03:47:51.447201+00:00] q1.INFO: Message with id 25 from queue q1 was processed successfully {"id":25,"queue":"q1"} []
[2024-06-12T03:47:51.448520+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-12T03:47:54.448823+00:00] q1.INFO: Message with id 2 from queue q1 was processed successfully {"id":2,"queue":"q1"} []
[2024-06-12T03:47:54.449815+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-12T03:47:59.449968+00:00] q1.INFO: Message with id 4 from queue q1 was processed successfully {"id":4,"queue":"q1"} []
Processing of messages with id 2 and 4 threw an exception and unfortunately the behavior is the same as observed before
Let me know if you want me to test anything else.
Code for the consumer just in case
<?php
declare(strict_types=1);
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;
require realpath(__DIR__ . '/vendor/autoload.php');
$logDir = realpath(__DIR__ . '/var/log');
$consumer = new Consumer();
$loggers = [
$q = 'q1' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q2' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q3' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q4' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
];
while ($task = $consumer->waitTask()) {
try {
$queue = $task->getQueue();
/** @var LoggerInterface $logger */
$logger = $loggers[$queue];
(new GenericHandler($logger))->process($task);
$task->complete();
} catch (Throwable $e) {
$logger->error($e);
$task->fail($e, true);
}
}
Thanks, @shieldz80 👍
I guess that the problem is in the priority. Are all the messages are already in the RabbitMQ queue, or they're pushed one by one after consume?
Also, if that's possible, could you please show the code with amqp extension? Where everything was in order.
Are all the messages are already in the RabbitMQ queue, or they're pushed one by one after consume?
When I test, I bring up all the containers, then I execute the command that publishes the 100 messages to the "main" exchange. So they're produced more or less at once, definitely not one by one.
Also, if that's possible, could you please show the code with amqp extension? Where everything was in order.
Sure, here it is
<?php
declare(strict_types=1);
namespace Shieldz\ConsumerApp\Command;
use AMQPChannel;
use AMQPConnection;
use AMQPEnvelope;
use AMQPQueue;
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Shieldz\ConsumerApp\Task\Task;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;
use function extension_loaded;
use function sprintf;
use const AMQP_REQUEUE;
use const SIGINT;
use const SIGQUIT;
use const SIGTERM;
#[AsCommand(name: 'consume-messages')]
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
{
private AMQPConnection $connection;
private int $signal = -1;
private bool $shouldStop = false;
public function __construct(private Logger $loggerProto)
{
parent::__construct();
$this->setAmqpConnection();
}
protected function configure(): void
{
$this->addArgument('queueName', InputArgument::REQUIRED);
$this->addOption(name: 'max-jobs', mode: InputOption::VALUE_REQUIRED, default: 10);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$queueName = $input->getArgument('queueName');
$maxJobs = $input->getOption('max-jobs');
$logger = $this->loggerProto
->withName($queueName)
->pushHandler(
new StreamHandler(sprintf('%s/%s.log', APP_LOG_DIR, $queueName), Level::Debug)
);
$queue = new AMQPQueue(new AMQPChannel($this->connection));
$queue->setName($queueName);
$executedJobs = 0;
while (false === $this->shouldStop) {
$message = $queue->get();
if (!$message instanceof AMQPEnvelope) {
continue;
}
try {
(new GenericHandler($logger))
->process(new Task($message->getBody(), $queue->getName()));
$queue->ack($message->getDeliveryTag());
} catch (Throwable $e) {
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
$logger->error($e);
}
$this->shouldStop = ++$executedJobs >= $maxJobs;
}
return match ($this->signal) {
-1 => Command::SUCCESS,
default => 128 + $this->signal,
};
}
public function getSubscribedSignals(): array
{
return extension_loaded('pcntl') ? [SIGINT, SIGTERM, SIGQUIT] : [];
}
public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false
{
$this->signal = $signal;
$this->shouldStop = true;
return false;
}
private function setAmqpConnection(): void
{
if (isset($this->connection)) {
return;
}
$connection = new AMQPConnection();
$connection->setHost('rabbitmq');
$connection->setLogin('admin');
$connection->setPassword('123');
$connection->connect();
$this->connection = $connection;
}
}
The processing code is pretty much the same as in the RR version
class GenericHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function process(Task $task): void
{
$queue = $task->getQueue();
/** @var array{id: int, load: int} $payload */
$payload = json_decode($task->getPayload(), true);
$this->logger->info(
'Processing message with id {id} from queue {queue}',
['id' => $payload['id'], 'queue' => $queue]
);
$this->doProcessing($payload['load']);
$this->logger->info(
'Message with id {id} from queue {queue} was processed successfully',
['id' => $payload['id'], 'queue' => $queue]
);
}
private function doProcessing(int $load): void
{
$throwRandomExceptions = $_ENV['THROW_RANDOM_EXCEPTIONS'] ?? '0' === '1';
if ($throwRandomExceptions && random_int(1, 20) === 5) {
// 5% chance to throw an exception
throw new RuntimeException('Processing failed');
}
// this emulates processing for $load seconds
sleep($load);
}
}
Consuming is then started by running php bin/console.php consume-messages --max-jobs=10 q1
Hm, yeah, code is just about the same... Could you please, use debug logging for RR and show me the debug logs? Also, if I understand correctly, you're using only 1 queue, but declared 4 of them. So, 3 of them are not used, right?
Could you please, use debug logging for RR and show me the debug logs?
Sure, I'll post the logs soon.
if I understand correctly, you're using only 1 queue, but declared 4 of them. So, 3 of them are not used, right?
No, all four are used, the container uses s6-overlay to start 4 supervised processes that each consume from one queue
Process tree looks like this in the container that uses plain php to consume from rabbitmq
$ docker top classic-consumer axf o "pid,user,args"
PID USER COMMAND
3754558 root \_ /package/admin/s6/command/s6-svscan -d4 -- /run/service
3754664 root \_ s6-supervise s6-linux-init-shutdownd
3754666 root | \_ /package/admin/s6-linux-init/command/s6-linux-init-shutdownd -d3 -c /run/s6/basedir -g 3000 -C -B
3754672 root \_ s6-supervise q1-consumer
3754718 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q1
3754673 root \_ s6-supervise s6rc-oneshot-runner
3754687 root | \_ /package/admin/s6/command/s6-ipcserverd -1 -- /package/admin/s6/command/s6-ipcserver-access -v0 -E -l0 -i data/rules -- /package/admin/s6/command/s6-sudod -t 30000 -- /package/admin/s6-rc/command/s6-rc-oneshot-run -l ../.. --
3754674 root \_ s6-supervise q3-consumer
3754717 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q3
3754675 root \_ s6-supervise q4-consumer
3754715 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q4
3754676 root \_ s6-supervise s6rc-fdholder
3754677 root \_ s6-supervise q2-consumer
3754719 shieldz \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q2
I mean, for the RR. You also used all 4 queues, right?
Here are logs (this is what the container outputted during the processing of messages)
If you wanted other logs let me know ...
I mean, for the RR. You also used all 4 queues, right?
Yes, I produce 25 messages for each queue, and all are consumed in my setup. I only showed you processing logs for 1 queue to avoid cluttering the comments
Ok, debug logs helped me to figure out what's happening.
As you may see, there is an entry with the message: job was re-queued
. That means, that RR failed to process the JOB (message, envelope) and it was re-queued. But actually, re-queue means the following - update headers with new from the PHP worker, then push the job back (same payload, new headers), confirm that, Ack the previous one.
Thus, you see these failed jobs at the end of the queue.
So exposing that multiply option could alter this behavior?
Actually, no. We should add a new option to Nack
the message, instead of Re-queueing it.
@msmakouz Here is the updated protocol after our discussion to handle this case:
roadrunner-php/jobs
client library: ack
, nack(data)
, requeue(data)
.ACK
, NACK
, REQUEUE
, after the currently existing NoError
and Error
.message
.requeue(data)
. Data payload is the same.@shieldz80 Thanks again for the report. We're going to update our internal RR<->PHP protocol to handle raw NACK's. In your case, should be able to just NACK the message to put it back to the same queue at the same position. Keep in mind, however, that pipelines are consumed in parallel, not synchronously 1-by-1.
We're going to update our internal RR<->PHP protocol to handle raw NACK's. In your case, should be able to just NACK the message to put it back to the same queue at the same position. Keep in mind, however, that pipelines are consumed in parallel, not synchronously 1-by-1.
Great! Thank you!
Hey @shieldz80 👋 New protocol handlers are added to the RR. We'll need to sync a PHP part and you'll be able to re-test your case using the usual ack/nack instead of requeue.
Hi @rustatian,
Thanks! Looking forward to test it. Is there some way to know when the PHP SDK is updated?
Hey @shieldz80 , This or early next week.
Hi @rustatian , @msmakouz
I saw that the ack
/nack
functionality was merged into the 4.x branch of the roadrunner-php/jobs
project and I decided to test it, but unfortunately something doesn't look right to me. When there's an error even though the message is nack
ed it seems it just gets lost from what I see in my logs. I attached the RR debug logs.
Here's also what I changed in my code. The consumer code now looks like this to use ack
/nack
<?php
declare(strict_types=1);
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;
require realpath(__DIR__ . '/vendor/autoload.php');
$logDir = realpath(__DIR__ . '/var/log');
$consumer = new Consumer();
$loggers = [
$q = 'q1' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q2' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q3' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q4' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
];
while ($task = $consumer->waitTask()) {
try {
$queue = $task->getQueue();
/** @var LoggerInterface $logger */
$logger = $loggers[$queue];
(new GenericHandler($logger))->process($task);
$task->ack();
} catch (Throwable $e) {
$logger->error($e);
$task->nack($e, redelivery: true);
}
}
And I've updated the composer.json
file to use the 4.x branch since no 4.5.0 tag was added yet.
{
"name": "shieldz/rr-consumer-app",
"type": "project",
"autoload": {
"psr-4": {
"Shieldz\\ConsumerApp\\": "src/"
}
},
"require": {
"php": "^8.3",
"ext-sockets": "*",
"spiral/roadrunner-jobs": "4.x-dev",
"monolog/monolog": "^3.6"
}
}
The output I get in my application for one of the queues, looks like this
[2024-06-25T05:33:19.155856+00:00] q3.INFO: Processing message with id 1 from queue q3 {"id":1,"queue":"q3"} []
[2024-06-25T05:33:20.156814+00:00] q3.INFO: Message with id 1 from queue q3 was processed successfully {"id":1,"queue":"q3"} []
[2024-06-25T05:33:20.157521+00:00] q3.INFO: Processing message with id 2 from queue q3 {"id":2,"queue":"q3"} []
[2024-06-25T05:33:21.157766+00:00] q3.INFO: Message with id 2 from queue q3 was processed successfully {"id":2,"queue":"q3"} []
[2024-06-25T05:33:21.159260+00:00] q3.INFO: Processing message with id 3 from queue q3 {"id":3,"queue":"q3"} []
[2024-06-25T05:33:22.159621+00:00] q3.INFO: Message with id 3 from queue q3 was processed successfully {"id":3,"queue":"q3"} []
[2024-06-25T05:33:22.161287+00:00] q3.INFO: Processing message with id 4 from queue q3 {"id":4,"queue":"q3"} []
[2024-06-25T05:33:23.161657+00:00] q3.INFO: Message with id 4 from queue q3 was processed successfully {"id":4,"queue":"q3"} []
[2024-06-25T05:33:23.163091+00:00] q3.INFO: Processing message with id 5 from queue q3 {"id":5,"queue":"q3"} []
[2024-06-25T05:33:24.163319+00:00] q3.INFO: Message with id 5 from queue q3 was processed successfully {"id":5,"queue":"q3"} []
[2024-06-25T05:33:24.164279+00:00] q3.INFO: Processing message with id 6 from queue q3 {"id":6,"queue":"q3"} []
[2024-06-25T05:33:25.164484+00:00] q3.INFO: Message with id 6 from queue q3 was processed successfully {"id":6,"queue":"q3"} []
[2024-06-25T05:33:25.165324+00:00] q3.INFO: Processing message with id 7 from queue q3 {"id":7,"queue":"q3"} []
[2024-06-25T05:33:26.165612+00:00] q3.INFO: Message with id 7 from queue q3 was processed successfully {"id":7,"queue":"q3"} []
[2024-06-25T05:33:26.167605+00:00] q3.INFO: Processing message with id 8 from queue q3 {"id":8,"queue":"q3"} []
[2024-06-25T05:33:26.167810+00:00] q3.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:33:26.169076+00:00] q3.INFO: Processing message with id 9 from queue q3 {"id":9,"queue":"q3"} []
[2024-06-25T05:33:27.169272+00:00] q3.INFO: Message with id 9 from queue q3 was processed successfully {"id":9,"queue":"q3"} []
[2024-06-25T05:33:27.169856+00:00] q3.INFO: Processing message with id 10 from queue q3 {"id":10,"queue":"q3"} []
[2024-06-25T05:33:28.170024+00:00] q3.INFO: Message with id 10 from queue q3 was processed successfully {"id":10,"queue":"q3"} []
[2024-06-25T05:33:28.181617+00:00] q3.INFO: Processing message with id 11 from queue q3 {"id":11,"queue":"q3"} []
[2024-06-25T05:33:29.181850+00:00] q3.INFO: Message with id 11 from queue q3 was processed successfully {"id":11,"queue":"q3"} []
[2024-06-25T05:33:29.184378+00:00] q3.INFO: Processing message with id 12 from queue q3 {"id":12,"queue":"q3"} []
[2024-06-25T05:33:30.184946+00:00] q3.INFO: Message with id 12 from queue q3 was processed successfully {"id":12,"queue":"q3"} []
[2024-06-25T05:33:30.185439+00:00] q3.INFO: Processing message with id 13 from queue q3 {"id":13,"queue":"q3"} []
[2024-06-25T05:33:31.185579+00:00] q3.INFO: Message with id 13 from queue q3 was processed successfully {"id":13,"queue":"q3"} []
[2024-06-25T05:33:31.187067+00:00] q3.INFO: Processing message with id 14 from queue q3 {"id":14,"queue":"q3"} []
[2024-06-25T05:33:32.187288+00:00] q3.INFO: Message with id 14 from queue q3 was processed successfully {"id":14,"queue":"q3"} []
[2024-06-25T05:33:32.188656+00:00] q3.INFO: Processing message with id 15 from queue q3 {"id":15,"queue":"q3"} []
[2024-06-25T05:33:33.188872+00:00] q3.INFO: Message with id 15 from queue q3 was processed successfully {"id":15,"queue":"q3"} []
[2024-06-25T05:33:33.189483+00:00] q3.INFO: Processing message with id 16 from queue q3 {"id":16,"queue":"q3"} []
[2024-06-25T05:33:34.189598+00:00] q3.INFO: Message with id 16 from queue q3 was processed successfully {"id":16,"queue":"q3"} []
[2024-06-25T05:33:34.190953+00:00] q3.INFO: Processing message with id 17 from queue q3 {"id":17,"queue":"q3"} []
[2024-06-25T05:33:35.191347+00:00] q3.INFO: Message with id 17 from queue q3 was processed successfully {"id":17,"queue":"q3"} []
[2024-06-25T05:33:35.191998+00:00] q3.INFO: Processing message with id 18 from queue q3 {"id":18,"queue":"q3"} []
[2024-06-25T05:33:36.192140+00:00] q3.INFO: Message with id 18 from queue q3 was processed successfully {"id":18,"queue":"q3"} []
[2024-06-25T05:33:36.192856+00:00] q3.INFO: Processing message with id 19 from queue q3 {"id":19,"queue":"q3"} []
[2024-06-25T05:33:37.193063+00:00] q3.INFO: Message with id 19 from queue q3 was processed successfully {"id":19,"queue":"q3"} []
[2024-06-25T05:33:37.194655+00:00] q3.INFO: Processing message with id 20 from queue q3 {"id":20,"queue":"q3"} []
[2024-06-25T05:33:38.195043+00:00] q3.INFO: Message with id 20 from queue q3 was processed successfully {"id":20,"queue":"q3"} []
[2024-06-25T05:33:38.196647+00:00] q3.INFO: Processing message with id 21 from queue q3 {"id":21,"queue":"q3"} []
[2024-06-25T05:33:39.196991+00:00] q3.INFO: Message with id 21 from queue q3 was processed successfully {"id":21,"queue":"q3"} []
[2024-06-25T05:33:39.200426+00:00] q3.INFO: Processing message with id 22 from queue q3 {"id":22,"queue":"q3"} []
[2024-06-25T05:33:40.200676+00:00] q3.INFO: Message with id 22 from queue q3 was processed successfully {"id":22,"queue":"q3"} []
[2024-06-25T05:33:40.215815+00:00] q3.INFO: Processing message with id 23 from queue q3 {"id":23,"queue":"q3"} []
[2024-06-25T05:33:40.216080+00:00] q3.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:33:40.217224+00:00] q3.INFO: Processing message with id 24 from queue q3 {"id":24,"queue":"q3"} []
[2024-06-25T05:33:41.217606+00:00] q3.INFO: Message with id 24 from queue q3 was processed successfully {"id":24,"queue":"q3"} []
[2024-06-25T05:33:41.219604+00:00] q3.INFO: Processing message with id 25 from queue q3 {"id":25,"queue":"q3"} []
[2024-06-25T05:33:42.219870+00:00] q3.INFO: Message with id 25 from queue q3 was processed successfully {"id":25,"queue":"q3"} []
Here, processing of messages 8 and 23 failed but I don't see them being reprocessed anywhere in the log, not even at the back like before.
Ah, sorry, my mistake, apparently I also need requeue_on_fail: true
in the config, and now it looks good
version: '3'
amqp:
addr: amqp://admin:123@rabbitmq:5672
rpc:
listen: tcp://127.0.0.1:6001
server:
command: php consumer.php
relay: pipes
jobs:
pool:
num_workers: ${RR_WORKERS:-4}
max_jobs: 10
consume: [ "q1", "q2", "q3", "q4" ]
pipelines:
q1:
driver: amqp
config:
prefetch: ${RR_Q1_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q1-rk
queue: q1
durable: true
requeue_on_fail: true
q2:
driver: amqp
config:
prefetch: ${RR_Q2_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q2-rk
queue: q2
durable: true
requeue_on_fail: true
q3:
driver: amqp
config:
prefetch: ${RR_Q3_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q3-rk
queue: q3
durable: true
requeue_on_fail: true
q4:
driver: amqp
config:
prefetch: ${RR_Q4_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q4-rk
queue: q4
durable: true
requeue_on_fail: true
[2024-06-25T05:54:05.880121+00:00] q1.INFO: Processing message with id 1 from queue q1 {"id":1,"queue":"q1"} []
[2024-06-25T05:54:06.881515+00:00] q1.INFO: Message with id 1 from queue q1 was processed successfully {"id":1,"queue":"q1"} []
[2024-06-25T05:54:06.884101+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-25T05:54:07.884481+00:00] q1.INFO: Message with id 2 from queue q1 was processed successfully {"id":2,"queue":"q1"} []
[2024-06-25T05:54:07.886239+00:00] q1.INFO: Processing message with id 3 from queue q1 {"id":3,"queue":"q1"} []
[2024-06-25T05:54:08.886648+00:00] q1.INFO: Message with id 3 from queue q1 was processed successfully {"id":3,"queue":"q1"} []
[2024-06-25T05:54:08.887972+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-25T05:54:09.888359+00:00] q1.INFO: Message with id 4 from queue q1 was processed successfully {"id":4,"queue":"q1"} []
[2024-06-25T05:54:09.889725+00:00] q1.INFO: Processing message with id 5 from queue q1 {"id":5,"queue":"q1"} []
[2024-06-25T05:54:10.889894+00:00] q1.INFO: Message with id 5 from queue q1 was processed successfully {"id":5,"queue":"q1"} []
[2024-06-25T05:54:10.890779+00:00] q1.INFO: Processing message with id 6 from queue q1 {"id":6,"queue":"q1"} []
[2024-06-25T05:54:11.890956+00:00] q1.INFO: Message with id 6 from queue q1 was processed successfully {"id":6,"queue":"q1"} []
[2024-06-25T05:54:11.892676+00:00] q1.INFO: Processing message with id 7 from queue q1 {"id":7,"queue":"q1"} []
[2024-06-25T05:54:12.892978+00:00] q1.INFO: Message with id 7 from queue q1 was processed successfully {"id":7,"queue":"q1"} []
[2024-06-25T05:54:12.894760+00:00] q1.INFO: Processing message with id 8 from queue q1 {"id":8,"queue":"q1"} []
[2024-06-25T05:54:13.895215+00:00] q1.INFO: Message with id 8 from queue q1 was processed successfully {"id":8,"queue":"q1"} []
[2024-06-25T05:54:13.897047+00:00] q1.INFO: Processing message with id 9 from queue q1 {"id":9,"queue":"q1"} []
[2024-06-25T05:54:14.897424+00:00] q1.INFO: Message with id 9 from queue q1 was processed successfully {"id":9,"queue":"q1"} []
[2024-06-25T05:54:14.899369+00:00] q1.INFO: Processing message with id 10 from queue q1 {"id":10,"queue":"q1"} []
[2024-06-25T05:54:15.899739+00:00] q1.INFO: Message with id 10 from queue q1 was processed successfully {"id":10,"queue":"q1"} []
[2024-06-25T05:54:15.920888+00:00] q1.INFO: Processing message with id 11 from queue q1 {"id":11,"queue":"q1"} []
[2024-06-25T05:54:16.921557+00:00] q1.INFO: Message with id 11 from queue q1 was processed successfully {"id":11,"queue":"q1"} []
[2024-06-25T05:54:16.922120+00:00] q1.INFO: Processing message with id 12 from queue q1 {"id":12,"queue":"q1"} []
[2024-06-25T05:54:17.922297+00:00] q1.INFO: Message with id 12 from queue q1 was processed successfully {"id":12,"queue":"q1"} []
[2024-06-25T05:54:17.923652+00:00] q1.INFO: Processing message with id 13 from queue q1 {"id":13,"queue":"q1"} []
[2024-06-25T05:54:18.923892+00:00] q1.INFO: Message with id 13 from queue q1 was processed successfully {"id":13,"queue":"q1"} []
[2024-06-25T05:54:18.925309+00:00] q1.INFO: Processing message with id 14 from queue q1 {"id":14,"queue":"q1"} []
[2024-06-25T05:54:19.925642+00:00] q1.INFO: Message with id 14 from queue q1 was processed successfully {"id":14,"queue":"q1"} []
[2024-06-25T05:54:19.927307+00:00] q1.INFO: Processing message with id 15 from queue q1 {"id":15,"queue":"q1"} []
[2024-06-25T05:54:20.927726+00:00] q1.INFO: Message with id 15 from queue q1 was processed successfully {"id":15,"queue":"q1"} []
[2024-06-25T05:54:20.929539+00:00] q1.INFO: Processing message with id 16 from queue q1 {"id":16,"queue":"q1"} []
[2024-06-25T05:54:21.930034+00:00] q1.INFO: Message with id 16 from queue q1 was processed successfully {"id":16,"queue":"q1"} []
[2024-06-25T05:54:21.931781+00:00] q1.INFO: Processing message with id 17 from queue q1 {"id":17,"queue":"q1"} []
[2024-06-25T05:54:22.932127+00:00] q1.INFO: Message with id 17 from queue q1 was processed successfully {"id":17,"queue":"q1"} []
[2024-06-25T05:54:22.933836+00:00] q1.INFO: Processing message with id 18 from queue q1 {"id":18,"queue":"q1"} []
[2024-06-25T05:54:23.934196+00:00] q1.INFO: Message with id 18 from queue q1 was processed successfully {"id":18,"queue":"q1"} []
[2024-06-25T05:54:23.938761+00:00] q1.INFO: Processing message with id 19 from queue q1 {"id":19,"queue":"q1"} []
[2024-06-25T05:54:24.939162+00:00] q1.INFO: Message with id 19 from queue q1 was processed successfully {"id":19,"queue":"q1"} []
[2024-06-25T05:54:24.959832+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-25T05:54:24.960439+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:54:24.961387+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-25T05:54:25.961620+00:00] q1.INFO: Message with id 20 from queue q1 was processed successfully {"id":20,"queue":"q1"} []
[2024-06-25T05:54:25.963436+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-25T05:54:26.964049+00:00] q1.INFO: Message with id 21 from queue q1 was processed successfully {"id":21,"queue":"q1"} []
[2024-06-25T05:54:26.965626+00:00] q1.INFO: Processing message with id 22 from queue q1 {"id":22,"queue":"q1"} []
[2024-06-25T05:54:27.965937+00:00] q1.INFO: Message with id 22 from queue q1 was processed successfully {"id":22,"queue":"q1"} []
[2024-06-25T05:54:27.967046+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-25T05:54:27.967294+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:54:27.968419+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-25T05:54:28.968653+00:00] q1.INFO: Message with id 23 from queue q1 was processed successfully {"id":23,"queue":"q1"} []
[2024-06-25T05:54:28.970063+00:00] q1.INFO: Processing message with id 24 from queue q1 {"id":24,"queue":"q1"} []
[2024-06-25T05:54:29.970398+00:00] q1.INFO: Message with id 24 from queue q1 was processed successfully {"id":24,"queue":"q1"} []
[2024-06-25T05:54:29.971685+00:00] q1.INFO: Processing message with id 25 from queue q1 {"id":25,"queue":"q1"} []
[2024-06-25T05:54:30.972017+00:00] q1.INFO: Message with id 25 from queue q1 was processed successfully {"id":25,"queue":"q1"} []
Here, processing of messages 20 and 23 failed, but they were re-processed in-order as expected.
Awesome! Thanks a lot!
Hey @shieldz80 👋
You won't be needed to set requeue_on_fail
after I merge additional patches. Because you will be able to set redelivery
if failed per message (from PHP, NACK has additional parameters).
Currently, these parameters are not parsed by RR.
Plugin
JOBS
I have an idea!
Hi,
First of let me just say thank you for the awesome project!
As discussed in https://github.com/orgs/roadrunner-server/discussions/1932 when using RoadRunner with RabbitMQ, if, during the processing of a message, an error occurs and the message is re-queued it will end up at the back of the queue. This breaks the expected behavior of RabbitMQ which states that (https://www.rabbitmq.com/docs/semantics#ordering)
A sample log of this issue presented below
Here we can see that during the processing of message with id 21 an exception was thrown and the message is re-queued and processed again last.
If possible, the expected behavior of RabbitMQ should be preserved.