noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Problems with PhpAmqpLib #70

Closed nachitox closed 3 years ago

nachitox commented 3 years ago

Hi, I'm dumb or I cannot make it work.

I tested all combinations of $this->channel->queue_declare() and $this->channel->exchange_declare() but when sending multiple messages with the same body, the messages are duplicated.

RabbitMQ 3.8.14 Erlang 23.3.1 PhpAmqpLib 3.0.0 Plugin 0.5.0

This is the code I'm testing.

//! Event component
class EventComponent extends Component
{
    private $exchange = 'router';
    private $queue = 'msgs';

    private $connection;
    private $channel;

    //! Contructor, connects to RabbitMq server
    public function initialize(array $config)
    {
        $this->connection = new AMQPStreamConnection(Configure::read('RabbitMQ.server'), Configure::read('RabbitMQ.port'), Configure::read('RabbitMQ.user'), Configure::read('RabbitMQ.pass'), '/');
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare($this->queue, false, true, false, false);
        //$this->channel->queue_declare($this->queue, false, true, false, false, false, ['x-message-deduplication' => ['t', true]]);
        //$this->channel->exchange_declare($this->exchange, AMQPExchangeType::DIRECT, false, true, false);
        $this->channel->exchange_declare($this->exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, ['x-message-deduplication' => ['t', true]]);
        $this->channel->queue_bind($this->queue, $this->exchange);
    }

    //! Send message to the queue
    public function send($messageBody)
    {
        $message = new AMQPMessage($messageBody, [
            'application_headers'   => [
                'x-deduplication-header' => ['S', md5($messageBody)]
            ],
            'content_type'      => 'text/plain',
            'delivery_mode'     => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        $this->channel->basic_publish($message, $this->exchange);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}
nachitox commented 3 years ago

Nobody?

noxdafox commented 3 years ago

Please use GitHub to report bugs or request features and not for asking for guidance.

For guidance you can either use Stackoverflow or the rabbitmq-users mailing list. https://groups.google.com/g/rabbitmq-users

I am not familiar with PHP so I cannot help in here. What I can guess is you are not configuring the resources correctly.

For example, the line:

$this->channel->exchange_declare($this->exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, ['x-message-deduplication' => ['t', true]]);

Suggests me you are declaring an exchange of type DIRECT with some boolean attribute x-message-deduplication set to true.

The README documentation clearly states you need to create an exchange of type x-message-deduplication if you want to use exchange-level deduplication.

You can take a look at a Python example as reference: https://gist.github.com/noxdafox/ad1fb4c3769e06a888c3a542fc08c544