Open lifinsky opened 4 days ago
@dgafka Hi)
A bit of debugging in InterceptedConsumer
while ($this->shouldBeRunning()) {
\var_dump('--run--');
private function shouldBeRunning(): bool
{
if (! $this->shouldBeRunning) {
return false;
}
foreach ($this->consumerInterceptors as $consumerInterceptor) {
if ($consumerInterceptor->shouldBeStopped()) {
\var_dump('--should be stopped--');
php bin/console ecotone:run db
string(7) "--run--"
string(7) "--run--"
^Cstring(21) "--should be stopped--"
php bin/console ecotone:run async
string(7) "--run--"
string(7) "--run--"
^Cstring(7) "--run--"
string(7) "--run--"
Interceptors list:
Ecotone\Messaging\Endpoint\Interceptor\LimitMemoryUsageInterceptor Ecotone\Messaging\Endpoint\Interceptor\SignalInterceptor Ecotone\Messaging\Endpoint\Interceptor\ConnectionExceptionRetryInterceptor
@dgafka $extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q)
leads to this problem
Possible solution: implement AmqpSubscriptionConsumer:
<?php
namespace Ecotone\Amqp;
use Enqueue\AmqpExt\AmqpConsumer;
use Enqueue\AmqpExt\AmqpContext;
use Interop\Queue\Consumer;
/**
* licence Apache-2.0
*/
class AmqpSubscriptionConsumer implements \Interop\Amqp\AmqpSubscriptionConsumer
{
private null|array $subscriber = null;
public function __construct(private readonly AmqpContext $context)
{
}
/**
* @throws \AMQPQueueException
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
*/
public function consume(int $timeout = 0): void
{
if (null === $this->subscriber) {
throw new \LogicException('There is no subscriber.');
}
/** @var AmqpConsumer $consumer */
$consumer = $this->subscriber[0];
$timeout = $timeout / 1000;
$extQueue = new \AMQPQueue($this->context->getExtChannel());
$extQueue->setName($consumer->getQueue()->getQueueName());
for (;;) {
$start ??= microtime(true);
$extEnvelope = $extQueue->get();
if (!$extEnvelope) {
if (microtime(true) - $start > $timeout) {
return;
}
usleep(100000);
continue;
}
$message = $this->context->convertMessage($extEnvelope);
$message->setConsumerTag($consumer->getConsumerTag());
call_user_func($this->subscriber[1], $message, $consumer);
return;
}
}
public function subscribe(Consumer $consumer, callable $callback): void
{
if (!$consumer instanceof AmqpConsumer) {
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer)));
}
$this->subscriber = [$consumer, $callback];
}
public function unsubscribe(Consumer $consumer): void
{
$this->subscriber = null;
}
public function unsubscribeAll(): void
{
$this->subscriber = null;
}
}
@dgafka I can create PR
@lifinsky there is difference in basic_consume
and basic_get
and resulting performance.
Basic consume will do better than calling basic_get, and it does work with RabbitMQ streams (where get, does not).
So switching to get should rather be considered as last sort of effort. I may take a look if I can find any solution but not sure when I will find time for that. So if you can take a look and search for one that would be great
Consume cannot be used, it blocks signals. And without the ability to correctly terminate the process, the current solution makes no sense.
Symfony messenger uses get method for the same reason
Process is running After killing, process is stopped
I could kill the process by sending an signal in newest version of Ecotone. Are you sure, there is a problem?
Yes, I am sure. Consume calback + read timeout blocks signals. You can test it with empty queue for example.
Ecotone version(s) affected: latest
Description
The consumer process cannot be terminated correctly with AMQP channel.
How to reproduce
Run
ps
and find process pidThe process will not stopped. At the same time, everything works correctly with db consumer.