This repository aims to illustrate how to set up AMQP within Drupal. It contains a base structure with some working examples that use CommandHandlers to handle AMQP messages.
git clone git@github.com:robiningelbrecht/drupal-amqp-rabbitmq.git
lando start
to build the necessary docker containerslando composer install
to download vendor dependenciessettings.php
$databases['default']['default'] = [
'database' => 'drupal9',
'username' => 'drupal9',
'password' => 'drupal9',
'prefix' => '',
'host' => 'database',
'port' => '',
'namespace' => 'Drupal\\Core\\Database\\Driver\\mysql',
'driver' => 'mysql',
];
$settings['config_sync_directory'] = '../config/sync';
$settings['amqp_credentials'] = [
'host' => '172.21.0.3', // The AMQP host IP address is outputted in your CLI while running `lando start`
'port' => '5672',
'username' => 'guest',
'password' => 'guest',
'vhost' => '/',
'api' => 'http://rabbit.lndo.site/',
];
lando drush sql-cli < init.sql
There are basically 3 important terms to keep in mind:
The amqp
module provides a basic framework that allows you to
The amqp
module contains a SimpleQueue
and a SimpleQueueWorker
. Let's take a look
at an example of pushing and consuming messages:
It's recommended to add a queue for each type of task, for example:
send-notification-queue
migrate-article-queue
calculate-product-price-queue
This approach ensures that tasks of one type cannot block other ones. It also has the advantage that you can log failed messages on the corresponding failed queues of each queue:
send-notification-queue-failed
migrate-article-queue-failed
calculate-product-price-queue-failed
To declare a new queue, just add a new entry to your services.yml
and tag it with ampq_queue
:
Drupal\your_module\Queue\NewQueue:
autowire: true
tags:
- { name: amqp_queue }
Make sure this class extends BaseQueue
, so you don't have to bother queueing messages yourself.
@TODO: Explain how to push message to Q
If, fore some reason, a message could not be processed, you might want to log it somewhere.
A "failed queue" could be a solution here.\
To push a message to it's corresponding failed queue, you can use the FailedQueueFactory
:
$this->failedQueueFactory->buildFor($queue)->queue(message);
This factory can for example be used in the processFailure
callback of your worker:
public function processFailure(Envelope $envelope, AMQPMessage $message, \Throwable $exception, Queue $queue): void
{
/** @var Command $command */
$command = $envelope;
$command->setMetaData([
'exceptionMessage' => $exception->getMessage(),
'traceAsString' => $exception->getTraceAsString(),
]);
$failedQueue = $this->failedQueueFactory->buildFor($queue)->queue($command);
}
note: a failed queue has no worker attached to it, and thus, cannot be consumed. This means that the messages will stay on the queue until they are manually deletd.
In some more advanced use cases you might want to delay the consumption of messsages, for example:
You can achieve this by pushing the message to it's correspondng delayed queue:
$this->delayedQueueFactory->buildWithDelayForQueue(15, $queue)->queue($message);
For a delayed queue to work properly you'll have to do two things:
dlx
dlx
exchange, where the
routing key of the binding is the command queue name to where it has to be routed.
I like to use Commands and CommandHandlers to persist changes to the database. That is basically what
the cqrs
module is for. It provides a simple framework that
To add a new command (and command handler), just add a new entry to your services.yml
and tag it with cqrs_command_handler
:
Drupal\your_module\DoSomething\DoSomethingCommandHandler:
autowire: true
tags:
- { name: cqrs_command_handler }
The example module contains... an example (deuh) that shows how to implement a "real-time" migration for the content type "Breaking news".
Navigate to admin/content/generate-migration-message
. This form allows you to push a migration message to
a queue. It simulates how a third party could push a message to a Drupal migration queue
where it will get picked up by a consumer. The migration framework will then do the heavy lifting.
Generally you want to run consumers as a background process and keep them "alive" for as long
your server is up. This can be done using systemd
, but I choose to use supervisord
Supervisor is a client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems.
To register all consumers as a process, just run lando consumers-start
. This will spin up supervisord
and automatically create the necessary consumers for all of you queues.
When adding/removing queues or when updating queue config, you need to run lando consumers-restart
for your new settings to be picked up.
Important: Whenever you make changes to you code, make sure to run the restart command as well, as you don't want your consumers to be running with old code.
You can just run lando consumers-status
, this should output something like this:
ampq-consume-queue-one:ampq-consume-queue-one-00 RUNNING pid 1219, uptime 0:00:06
ampq-consume-queue-one:ampq-consume-queue-one-01 RUNNING pid 1215, uptime 0:00:07
ampq-consume-queue-one:ampq-consume-queue-two-01 RUNNING pid 1216, uptime 0:00:07