guibranco / pancake

🧰 🛠️ Pancake project - toolkit for PHP projects
https://guibranco.github.io/pancake/
MIT License
3 stars 1 forks source link

[FEATURE] Implement `Queue` wrapper #82

Open guibranco opened 8 months ago

guibranco commented 8 months ago

Description

Implement queue (AMQP) wrapper for basic features.

Tech notes

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Queue implements IQueue
{
    private $connectionStrings;

    public function __construct()
    {
        // TODO: Remove this code from here, receive the configuration as a parameter.
        $configuration = new Configuration();
        $this->connectionStrings = $configuration->getRabbitMq();
    }

    private function getServers()
    {
        if (empty($this->connectionStrings)) {
            throw new QueueException("RabbitMQ connection strings not found");
        }

        $servers = [];
        foreach ($this->connectionStrings as $connectionString) {
            $url = parse_url($connectionString);
            $servers[] = [
                "host" => $url["host"],
                "port" => isset($url["port"]) ? $url["port"] : 5672,
                "user" => $url["user"],
                "password" => $url["pass"],
                "vhost" => ($url['path'] == '/' || !isset($url['path'])) ? '/' : substr($url['path'], 1)
            ];
        }

        return $servers;
    }

    private function getConnection()
    {
        $servers = $this->getServers();
        $options = [];
        if (count($servers) == 1) {
            $options = ['connection_timeout' => 10.0, 'read_write_timeout' => 10.0,];
        }

        return AMQPStreamConnection::create_connection($servers, $options);
    }

   private function declareQueueWIthoutDLX($channel, $queueName) 
   {
      // TODO: declare the queue without DLX.
   }

    // TODO: rename to declareQueueWithDLX
    private function declareQueueAndDLX($channel, $queueName)
    {
        $channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $queueName . '-retry'
                )
            )
        );
        $channel->queue_declare(
            $queueName . '-retry',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                array(
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $queueName,
                    'x-message-ttl' => 1000 * 60 * 60
                )
            )
        );
    }

    public function publish($queueName, $message)
    {
        $connection = $this->getConnection();
        $channel = $connection->channel();
        $this->declareQueueAndDLX($channel, $queueName);
        $msgOptions = array(
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        );
        $msg = new AMQPMessage($message, $msgOptions);
        $channel->basic_publish($msg, '', $queueName);
        $channel->close();
        $connection->close();
    }

    public function consume($timeout, $queueName, $callback, $resetTimeoutOnReceive = false)
    {
        $startTime = time();
        $fn = function ($msg) use ($callback, $timeout, $startTime, $resetTimeoutOnReceive) {
            if ($resetTimeoutOnReceive) {
                $startTime = time();
            }
            $callback($timeout, $startTime, $msg);
        };

        $connection = $this->getConnection();
        $channel = $connection->channel();
        // TODO: check which one to call - declareQueueWithDLX or declareQueueWithoutDLX
        $this->declareQueueAndDLX($channel, $queueName);
        // TODO: the 10 should be an optional parameter of this method with default value as 10.
        $channel->basic_qos(null, 10, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $fn);

        while ($channel->is_consuming()) {
            $channel->wait(null, true);
            if ($startTime + $timeout < time()) {
                break;
            }
        }

        $channel->close();
        $connection->close();
    }
}

Additional information

⚠️ 🚨 Add documentation and tests

gitauto-ai[bot] commented 4 months ago

@guibranco Pull request completed! Check it out here https://github.com/guibranco/pancake/pull/171 🚀

Note: I automatically create a pull request for an unassigned and open issue in order from oldest to newest once a day at 00:00 UTC, as long as you have remaining automation usage. Should you have any questions or wish to change settings or limits, please feel free to contact info@gitauto.ai or invite us to Slack Connect.

gitauto-ai[bot] commented 1 month ago

Hey, I'm a bit lost here! Not sure which file I should be fixing. Could you give me a bit more to go on? Maybe add some details to the issue or drop a comment with some extra hints? Thanks!

Have feedback or need help? Feel free to email info@gitauto.ai.