ssi-anik / amqp

php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ
https://packagist.org/packages/anik/amqp
MIT License
134 stars 23 forks source link
laravel-rabbitmq lumen-rabbitmq php php-amqp php-rabbitmq rabbitmq rabbitmq-consumer

anik/amqp codecov PHP Version Require Total Downloads Latest Stable Version

anik/amqp is a php-amqplib wrapper that eases the consumption of RabbitMQ. A painless way of using RabbitMQ.

Note

Previously, the package could be used with Laravel, Laravel Zero, Lumen out of the box. From v2, the Laravel support has been removed. If you are looking for implementation with Laravel, you can use anik/laravel-amqp. If you were using this package with Laravel, and you want to upgrade to Laravel 9, please consider using anik/amqp-to-laravel-amqp if you want to migrate to anik/laravel-amqp later.

Examples

Checkout the repository for example.

Requirements

Installation

To install the package, run

composer require anik/amqp

Documentation

For V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90

Connection

To create an AMQP Connection, you can use

<?php

use Anik\Amqp\AmqpConnectionFactory;
use PhpAmqpLib\Connection\AMQPLazySSLConnection;

$host = '127.0.0.1';
$port = 5672;
$user = 'user';
$password = 'password';
$vhost = '/';
$options = []; // options to be proxied to the amqp connection class
$ofClass = AMQPLazySSLConnection::class;

$connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass);
$hosts = [
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ],
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ]
];

// With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host
$connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);

Exchange

Also, there are four specific exchange classes.

You can still use Anik\Amqp\Exchanges\Exchange base class to create your own exchange.

To instantiate an exchange, you can do like

<?php

use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Exchanges\Fanout;
use Anik\Amqp\Exchanges\Topic;

$exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT);

$exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]);

$exchange = new Topic('anik.amqp.topic.exchange');

$exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);

When creating an exchange instance with

Anik\Amqp\Exchanges\Exchange contains a few predefined exchange types, you can use them as reference.

The Exchange::make method also accepts the following keys when making an exchange instance.

You can also reconfigure the exchange instance using $exchange->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your exchange instance.

Queue

To instantiate a queue, you can do like

<?php

use Anik\Amqp\Queues\Queue;

$queue = new Queue('anik.amqp.direct.exchange.queue');

$queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);

When creating a queue instance with

The Queue::make method also accepts the following keys when making a queue instance.

You can also reconfigure the queue instance using $queue->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your queue instance.

Qos

To instantiate a Qos, you can do like

<?php

use Anik\Amqp\Qos\Qos;

$prefetchSize = 0;
$prefetchCount = 0;
$global = false;

$qos = new Qos($prefetchSize, $prefetchCount, $global);

$qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);

The Qos::make method also accepts the following key when making a qos instance.

You can also reconfigure the qos instance using $qos->reconfigure($options). The $options array accepts the above keys as well.

Also, you can use the following methods to configure your qos instance.

Publish/Produce message

To produce/publish messages, you'll need the Anik\Amqp\Producer instance. To instantiate the class

<?php

use Anik\Amqp\Producer;

$producer = new Producer($connection, $channel);

The constructor accepts

If $channel is not provided or null, class uses the channel from the $connection.

Once the producer class is instantiated, you can set a channel with setChannel. Method accepts PhpAmqpLib\Channel\AMQPChannel instance.

There are three ways to publish messages

Bulk Publish

Producer::publishBatch - to publish multiple messages in bulk.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);

Publish

Producer::publish - to publish a single message. Uses Producer::publishBatch under the hood.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publish($message, $routingKey, $exchange, $options);

Publish Basic

Producer::publishBasic - to publish a single message using AMQPChannel::basic_publish method.

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);

ProducibleMessage: Implementation of Producible Interface

The package comes with Anik\Amqp\ProducibleMessage, a generic implementation of Anik\Amqp\Producible interface.

You can instantiate the class like

<?php

use Anik\Amqp\ProducibleMessage;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new ProducibleMessage('take my message to rabbitmq');

$msg = new ProducibleMessage('take my message to rabbitmq', [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers' => new AMQPTable(['key' => 'value']),
]);

Consumer

To consume messages, you'll need the Anik\Amqp\Consumer instance. To instantiate the class

<?php

use Anik\Amqp\Consumer;

$consumer = new Consumer($connection, $channel, $options);

The constructor accepts

If $channel is not provided or null, class uses the channel from the $connection.

Once the consumer class is instantiated, you can access the following methods.

To consume messages,

<?php

use Anik\Amqp\Consumer;

(new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);

ConsumableMessage: Implementation of Consumable Interface

The package comes with Anik\Amqp\ConsumableMessage, a generic implementation of Anik\Amqp\Consumable interface.

You can instantiate the class like

<?php

use Anik\Amqp\ConsumableMessage;
// use PhpAmqpLib\Message\AMQPMessage;

$msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) {
    echo $message->getMessageBody() . PHP_EOL;
    echo $message->getRoutingKey() . PHP_EOL;
    $message->ack();
    // Alternatively, $original->ack();

    /** 
     * Method: `decodeMessage` 
     * Returns:
     *      - `array` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessage());

    /** 
     * Method: `decodeMessageAsObject` 
     * Returns:
     *      - `\stdClass` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessageAsObject());
});

NOTE: Calling any method on ConsumableMessage instance without setting AMQPMessage will throw exception.

Issues?

If you find any issue/bug/missing feature, please submit an issue and PRs if possible.