php-amqplib / RabbitMqBundle

RabbitMQ Bundle for the Symfony web framework
MIT License
1.22k stars 470 forks source link

batch consumer does not emit any event #730

Open speech77 opened 1 month ago

speech77 commented 1 month ago

Hi all,

OldSound\RabbitMqBundle\RabbitMq\BatchConsumer does not emit any event. Only OldSound\RabbitMqBundle\RabbitMq\Consumer and classes extending it emit OldSound\RabbitMqBundle\Event\AMQPEvents.

Thanks in advance. Cheers!

mihaileu commented 1 month ago

https://github.com/php-amqplib/RabbitMqBundle/blob/master/RabbitMq/BatchConsumer.php#L198 Currenly the normal events contains the message, in batch case I think it would be to much to attach all the messages of a specific batch. What are your requirements for this case ?

speech77 commented 1 month ago

I would like to have support for OnConsumeEvent and OnIdleEvent.

At the moment, I don't need BeforeProcessingMessageEvent and AfterProcessingMessageEvent, but I suppose two specific events, BeforeProcessingMessagesEvent and AfterProcessingMessagesEvent, could be created to handle the 'before' and 'after'.

speech77 commented 1 month ago

My requirements are to manage the heartbeat in all my consumers by injecting logic like this:

<?php

namespace Service;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
use Psr\Log\LoggerInterface;
use RuntimeException;

class RabbitMqHeartbeatService
{
    /**
     * @var PCNTLHeartbeatSender|null
     */
    private $sender;

    /**
     * @var LoggerInterface
     */
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @param AMQPChannel $channel
     * @param string $consumerTag
     * @throws RuntimeException
     */
    public function registerHeartbeat(AMQPChannel $channel, string $consumerTag): void
    {
        if ($this->sender === null) {
            $connection = $channel->getConnection();
            if ($connection === null) {
                throw new RuntimeException('Connection cannot be null');
            }

            $this->logger->debug('Registering PhpAmqpLib heartbeat', ['consumer_tag' => $consumerTag]);
            /*
             * This only works if we have set of blocking operation
             * and every one of them is inside 'heartbeat' timeframe
             * https://github.com/php-amqplib/RabbitMqBundle/issues/301#issuecomment-805161267
             */
            $sender = new PCNTLHeartbeatSender($connection);
            $sender->register();
            $this->sender = $sender;
        }
    }
}
mihaileu commented 4 weeks ago

Currently I cannot alocate time for this. A PR would help moving forward