Closed mrchrisadams closed 5 years ago
Hey @arendjantetteroo , can you let me know where queues are defined in this Symcony/Enqueue/AMQP combo?
Everything I try to set message priority so far seems to result in message priorities being ignored by RabbitMQ. Which means that doing any processing results in blocking incoming requests.
From what I can see you set queues, with priority like so, and you need to set it up when instantiating a queue. Here's a snippet from their docs
<?php
/** @var \Enqueue\AmqpExt\AmqpContext $context */
$fooQueue = $context->createQueue('foo');
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setPriority(5) // the higher priority the sooner a message gets to a consumer
//
->send($fooQueue, $message)
;
We use a ProducerInterface here in our DefaultController, where we delegate incoming requests to a the greencheck
queue here:
private function doGreencheck($url, $ip, $browser, $source = 'api', $blind = false)
{
$message = new Message(JSON::encode(['key' => 0, 'url' => $url, 'ip' => $ip, 'browser' => $browser, 'source' => $source, 'blind' => $blind]));
// We still want incoming requests to take precedence over other jobs, so
$message->setPriority(MessagePriority::VERY_HIGH);
$promise = $this->producer->sendCommand('greencheck', $message, $needReply = true);
$replyMessage = $promise->receive();
$data = JSON::decode($replyMessage->getBody());
return $data['result'];
}
Thing is our priority settings are not being respected, and I can't trace where the ProducerInterface
is declaring the greencheck
queue to see if we're passing in the necessary x-priority
arguments needed.
In the logs I see this here:
./bin/console enqueue:consume --setup-broker -vvv
07:58:06 DEBUG [app] [AmqpDriver] Declare router exchange: enqueue.default
07:58:06 DEBUG [app] [AmqpDriver] Declare router queue: enqueue.app.default
07:58:06 DEBUG [app] [AmqpDriver] Bind router queue to exchange: enqueue.app.default -> enqueue.default
07:58:06 DEBUG [app] [AmqpDriver] Declare processor queue
Which makes me think we're using the AMQP driver which does not pass in x-priority
info:
class AmqpDriver extends GenericDriver
{
// (snip )
/**
* @return AmqpQueue
*/
protected function doCreateQueue(string $transportQueueName): InteropQueue
{
/** @var AmqpQueue $queue */
$queue = parent::doCreateQueue($transportQueueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
return $queue;
}
}
When I think we need to be using the RabbitMQ driver, which does:
class RabbitMqDriver extends AmqpDriver
{
/**
* @return AmqpQueue
*/
protected function doCreateQueue(string $transportQueueName): InteropQueue
{
$queue = parent::doCreateQueue($transportQueueName);
$queue->setArguments(['x-max-priority' => 4]);
return $queue;
}
}
You can replicate this locally with this one liner if you have a dev environment running with Rabbit, Redis and MySQL:
./bin/console tgwf:greencheck:csvchecker && wget -O - http://localhost:8000/greencheck/google.com
this assumes there is a file called tgwf-check.csv
, with a list of entries, of up to 10,000 in them (that's enough on my machine to simulate worker pool of 2 being blocked when an incoming web request comes in)
google.com
youtube.com
facebook.com
baidu.com
wikipedia.org
qq.com
yahoo.com
taobao.com
amazon.com
I can't see where we explicitly set up a rabbitmq driver, or name queues (otherwise, I'll experiment with making a differently named queue, and allocate some workers there).
Where should I be looking here? It's driving my crazy.
OK, I think i finally found it.
Right now, I think we declare how we connect to rabbit by using the ENQUEUE_DNS
env var, and it might look like this:
ENQUEUE_DSN=amqp+ext://user:password@localhost:5672/vhost
This seems to load the regular AMQP driver without priority support. However, we if we pass in a rabbitmq specific extension in DSN, like amqp+rabbitmq://
, two things happen.
First, it's picked up (see the code in the enqueue codebase), and we get an error because we can't re-declare the current queues, if they're in use, and we're now passing in parameters to declare with support for message priorities:
You get an error like so for each queue in use, if you try, like this:
[AMQPQueueException (406)]
Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'enqueue.app.default' in vhost '/': received the value '4' of type 'long' but current is none
``
If you want to use the same queue names, you'll need to delete the queues, and stop any processes that connect to them, as I think they all check for a queue existing in the bootstrapping phase before they are ready to put stuff on the queue as producers, or take them off as consumers.
rabbitmqctl delete_queue enqueue.app.default rabbitmqctl delete_queue greencheck
We'll then need to make the change, and restart the processes, letting them declare these queues again with priority support.
The other option, I think is to set some different queue names with priority support, and deploy, but again, it's not obvious to me where we set queue names in the codebase - if you change a queue name in a worker, without declaring it as a route somewhere you get an error like so:
There is no route for command "NEW_QUEUE_NAME"
I can't work out where I'd declare this, and I think I'm blocked until I have that :|
Hmm, so after looking into this some more, it looks like our import PHP script runs out of memory doesn't when working larger files.
I've run the command to read from a 1 million line file, and it runs out memory at around 60k lines.
So, if we kept to a safer limit of 50k lines, we'd need to split a 1m line file into 20 smaller sections of 50k at a time, and run the greencheck:csvchecker
command 20 times.
PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 4096 bytes) in /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/enqueue/enqueue/Util/JSON.php on line 47
PHP Stack trace:
PHP 1. {main}() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/bin/console:0
PHP 2. Symfony\Bundle\FrameworkBundle\Console\Application->run() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/bin/console:38
PHP 3. Symfony\Bundle\FrameworkBundle\Console\Application->doRun() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/console/Application.php:145
PHP 4. Symfony\Bundle\FrameworkBundle\Console\Application->doRun() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/framework-bundle/Console/Application.php:75
PHP 5. Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/console/Application.php:269
PHP 6. Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/framework-bundle/Console/Application.php:89
PHP 7. App\Command\GreencheckCsvCheckerCommand->run() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/console/Application.php:935
PHP 8. App\Command\GreencheckCsvCheckerCommand->execute() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/symfony/console/Command/Command.php:255
PHP 9. Enqueue\Util\JSON::encode() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/src/Command/GreencheckCsvCheckerCommand.php:60
PHP 10. json_encode() /Users/chrisadams/Code/tgwf/thegreenwebfoundation/apps/api/vendor/enqueue/enqueue/Util/JSON.php:47
As such, I've written a small python script that does this for us now.
This transfers across the classes referred to in #9