This Symfony Messenger transport allows you to use Enqueue to send and receive your messages from all the supported brokers.
composer req sroze/messenger-enqueue-transport
default
Enqueue transport:# .env
# ...
###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
Configure Messenger's transport (that we will name amqp
) to use Enqueue's default
transport:
# config/packages/messenger.yaml
framework:
messenger:
transports:
amqp: enqueue://default
Route the messages that have to go through the message queue:
# config/packages/messenger.yaml
framework:
messenger:
# ...
routing:
'App\Message\MyMessage': amqp
Consume!
bin/console messenger:consume amqp
In the transport DSN, you can add extra configuration. Here is the common reference DSN (note that the values are just for the example):
enqueue://default
?queue[name]=queue_name
&topic[name]=topic_name
&deliveryDelay=1800
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
Each Enqueue transport (e.g. amqp, redis, etc) has its own message object
that can normally be configured by calling setter methods (e.g.
$message->setDeliveryDelay(5000)
). But in Messenger, you don't have access
to these objects directly. Instead, you can set them indirectly via
the TransportConfiguration
stamp:
use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
// ...
// create your message like normal
$message = // ...
$transportConfig = (new TransportConfiguration())
// commmon options have a convenient method
->setDeliveryDelay(5000)
// other transport-specific options are set via metadata
// example custom option for AmqpMessage
// each "metadata" will map to a setter on your message
// will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT)
// being called
->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT)
;
$bus->dispatch((new Envelope($message))->with($transportConfig));
You can send a message on a specific topic using TransportConfiguration
envelope item with your message:
use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
// ...
$transportConfig = (new TransportConfiguration())
->setTopic('specific-topic')
;
$bus->dispatch((new Envelope($message))->with($transportConfig));
See https://www.rabbitmq.com/tutorials/tutorial-five-php.html
You can use specific topic and queue options to configure your AMQP exchange in topic
mode and bind it:
enqueue://default
?queue[name]=queue_name
&queue[bindingKey]=foo.#
&topic[name]=topic_name
&topic[type]=topic
&deliveryDelay=1800
&delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
Here is the way to send a message with a routing key matching this consumer:
$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
'topic' => 'topic_name',
'metadata' => [
'routingKey' => 'foo.bar'
]
])));
Here is the way to send a message with with some custom options:
$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
'topic' => 'test_topic_name',
'metadata' => [
'key' => 'foo.bar',
'partition' => 0,
'timestamp' => (new \DateTimeImmutable())->getTimestamp(),
'messageId' => uniqid('kafka_', true),
]
])))