Closed hpatoio closed 6 years ago
Could you show please post enqueue configs for both apps
What broker do you use?
In ApplicationA
I've this command that I call using foo
as queue name
class SendTestMessageCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('enqueue:send-test-message')
->setDescription('Send a test message')
->addOption('queue', null, InputOption::VALUE_REQUIRED, 'Queue's name');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
if (!$input->getOption('queue')) {
throw new \InvalidArgumentException("Please specify the 'queue'");
}
$queueName = $input->getOption('queue');
$message = new Message([
'producer' => 'test-command-applicationA',
'body' => 'Hello from application A,
'produced_at' => (new \DateTime())->format('c'),
]);
$this->getContainer()->get('enqueue.producer')->sendEvent($queueName, $message);
}
}
and this conf
enqueue:
transport:
default:
alias: 'sqs'
sqs:
key: '%env(SQS_KEY)%'
secret: '%env(SQS_SECRET)%'
region: '%env(SQS_REGION)%'
client:
prefix: 'dev'
app_name: applicationa
And in `ApplicationB
enqueue:
transport:
default:
alias: 'sqs'
sqs:
key: '%env(SQS_KEY)%'
secret: '%env(SQS_SECRET)%'
region: '%env(SQS_REGION)%'
client:
prefix: 'dev'
app_name: applicationa
and the consumer
musement.consumer.foo_consumer:
class: 'App\Consumer\FooConsumer'
public: true
tags:
- { name: 'enqueue.client.processor', queueName: 'foo', processorName: 'foo', topicName: 'foo' }
public function process(PsrMessage $message, PsrContext $context)
{
$message = JSON::decode($message->getBody());
var_dump($message);
return self::ACK;
}
I'm using SQS
and I've a queue named dev_dot_applicationa_dot_foo
configured. What I don't understand is: can I specify the exact name of the queue I wanna send the command to and the exact name of the queue for the consumer ?
In application B, as you can see, I had to specifiy applicationa
as application name because Enqueue build the name of the queue using the application name. But what if in my application B I want a consumer that also read messages for a queue configured for applicationC
?
Hi @hpatoio Did you figure out a solution ?
It seems to me you have mixed topic and queue name when you wrote
$this->getContainer()->get('enqueue.producer')->sendEvent($queueName, $message);
By using sendEvent, message is sent to topic.
I'm working on the same use case with two apps, with a specific queue for communication from A to B. I ended up using "transport" instead of the "client" in order to specify the queue when creating the message.
I whished to be able to specify a topic in order to use the "client" in application B (does that makes sense after all ?) but just ended up using transport for consuming messages from that specific queue instead.
I did not find a solution. I've also opened this #459 where the author tells me to use "transport" directly (that it seems that is what you did)
I ended up using "transport" instead of the "client" in order to specify the queue when creating the message
Can I see the code somewhere ?
In application A, I've organized message publishing in two separate steps:
it looks like this
final class UserRegistrationPublisher implements UserRegistrationPublisher
{
private $producer;
private $serializer;
public function __construct(
SerializerInterface $serializer,
ProducerInterface $producer
) {
$this->serializer = $serializer;
$this->producer = $producer;
}
/**
* This is the context agnostic message publication.
* MQ processing starts from here.
*/
public function publishUser($user)
{
$message = $this->serializer->serialize($user, 'json');
$this->producer->sendEvent(
'a_user_registration', //topic
$message
);
}
}
final class UserRegistrationApplicationBForwarder implements PsrProcessor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return ['a_user_registration'];
}
/**
* forward to a specific queue
*/
public function process(PsrMessage $message, PsrContext $context)
{
$context->createProducer()->send(
$context->createQueue('a_to_b'), //queue
$context->createMessage($message->getBody())
);
return self::ACK;
}
}
On application B, I have a dedicated processor I call from the enqueue:transport:consume command
@hpatoio
class IncomingEmailProcessor implements PsrProcessor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return [
'from_a_to_b' => [
'queueName' => 'does_not_matter',
'queueNameHardcoded' => true
],
'another_topic_here' => [
'queueName' => 'can_be_differrent',
'queueNameHardcoded' => true
],
];
}
public function process(PsrMessage $message, PsrContext $session)
{
// Do something
}
}
What enqueue will do in this case is build a mapping from topic to [processor,queueName] and keep it in its RouterProcessor. Whenever a message is received on a topic, the router will publish messages back to the broker for each queue subscribed to that topic. In the modified example above IncomingEmailProcessor will receive messages on queues "does_not_matter" and "can_be_different" from the Router when messages are received on topics "from_a_to_b" and "another_topic_here".
Actually if you make another processor
class AnotherProcessor implements PsrProcessor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return ['from_a_to_b' => ['queueName' => 'another_queue_name', 'queueNameHardcoded' => true]];
}
public function process(PsrMessage $message, PsrContext $context) { ... }
}
it will also receive any message published on the "from_a_to_b" topic because enqueue's router will publish itself messages to all processor queues for which the processors have advertised themselves as subscribed to "from_a_to_b" topic.
Thank you very much for your examples @amne :+1: This "routing" operated by enqueue was confusing me and now it starts to be a bit clearer...
If I read your example in a context of 2 different apps A & B, I understand you would send a message to a 'from_a_to_b' topic from A, and in B, you would create your IncomingEmailProcessor
and AnotherProcessor
. Is that correct ?
@kermorgant yes, that is correct.
Thanks @amne. Sorry, it was a dumb question, I had lost the op's context...
I'm still a bit confused by several things about this scenario :
1) 'from_a_to_b' topic name is quite generic, and we could expect that different types of message start to share it after some time. And when that happens, IncomingMailProcessor
can't just continue to work as used to. Not sure how to best deal with that case.
2) my preference would be to start with a topic name that only reflects what just happened inside A (just as if we ignored the existence of B). But from my experience with enqueue, message would be removed from enqueue when all "TopicSubscribers" of A would have processed the message. How could we thus reliably expect B to also process the message ?
3) To avoid those 2 issues, I've therefore opted to isolate the message in a 'a_to_b' queue (see my comment from June 25). Yes, 'a_to_b' is still vague, should have named it like 'a_to_b_incoming_mail'.
I'm probably still missing some pieces to get the "big picture". I've only tested it with dbal so far, which perhaps gives me a particuliar bias too.
SQS transport does not support message\enterprise bus, It could if https://github.com/php-enqueue/enqueue-dev/issues/252 is done.
Also note that two applications should have different app_name set.
I've 2 symfony applications and I need to consume in
ApplicationB
messages generated byApplicationA
In
ApplicationA
I'veIn
ApplicationB
I'veBut I don't receive any message on
ApplicationB
where do I go wrong ?This seems related to #296 and #196