bschmitt / laravel-amqp

AMQP wrapper for Laravel and Lumen to publish and consume messages
MIT License
268 stars 86 forks source link

how would you consume raw rabbitmq messages indefinitely? #51

Closed marcstreeter closed 5 years ago

marcstreeter commented 5 years ago

I’m using this project for producing rabbitmq messages to the rest of the infrastructure it interacts with without any issue.

My question is: how would you consume raw rabbitmq messages indefinitely within Laravel without blocking other normal HTTP requests?

I noticed that the basic description does mention “Consume messages forever” and that is exactly what I'm aiming for, however if I were to do so wouldn't that block all http requests?

Would I use the supervisor for this and if so, how do I ensure that supervisor will run my consumer and not Laravel’s queue? I need for other services (that use a custom formatted message – ie, not Laravel’s message format) to send messages to Laravel.

Any help or pointers you may provide is much appreciated.

gutierrezps commented 5 years ago

That's exactly what I'm looking for too.

tpaksu commented 5 years ago

I'm using a command to run the "Consume messages forever" code like this:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Bschmitt\Amqp\Consumer;
use Bschmitt\Amqp\Facades\Amqp;
use MongoDB\Model\BSONDocument;

class ListenRabbit extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'queue:custom';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Catches Data from RabbitMQ';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        try {
            Amqp::consume('queue_name', function ($message, $resolver) {
                $data = $message->body;

                // do things with data here

                $resolver->acknowledge($message);
            });
        }catch(\PhpAmqpLib\Exception\AMQPTimeoutException $ex){
            echo "Connection timed out. Restarting.";
        }
    }
}

and on the supervisor side, I create the command as php artisan queue:custom

stevenklar commented 5 years ago

@tpaksu thanks for posting this! Just activated the php syntax highlighting if you don't mind. @marcstreeter @gutierrezps Does that answer your question?

I would also consider using Laravel Logging instead of just echo it out.

tpaksu commented 5 years ago

Hi @stevenklar , I'm using worker.log file of supervisor so I needed the echo statement :) And one more thing, this way if I don't set this config:

'connect_options' => [
    "read_write_timeout" => 120,
    "keepalive" => true,
    "heartbeat" => 60
],

the cpu usage gets high. Do you have any comments about it @stevenklar ?

stevenklar commented 5 years ago

@tpaksu Sorry. I have no idea. Maybe you can ask this question at the underlying package.

There are some suggestion when searching for this issue with amqp in general. ¯_(ツ)_/¯

marcstreeter commented 5 years ago

this is awesome, I'll give it a try. I'm currently bogged down with some other tasks at the moment, but this looks promising.

tpaksu commented 5 years ago

Ok @stevenklar thanks, I'll ask them.

gutierrezps commented 5 years ago

Hi @tpaksu and @stevenklar, I've implemented @tpaksu 's code, but it doesn't keep consuming indefinitely.

public function handle()
    {
        $this->info('Queue worker started');
        try {
            Amqp::consume('queue_name', function ($message, $resolver) {
                $data = $message->body;

                // do things with data here
                $this->line('[*] Message received: '.$data);

                $resolver->acknowledge($message);
            });
        }catch(\PhpAmqpLib\Exception\AMQPTimeoutException $ex){
            echo "Connection timed out. Restarting.";
        }
        $this->info('Queue worker stopped');
    }

After executing the command, both start and stop messages are printed. Here's my config/amqp.php file:

<?php
return [
    /*
    |--------------------------------------------------------------------------
    | Define which configuration should be used
    |--------------------------------------------------------------------------
    */
    'use' => 'production',
    /*
    |--------------------------------------------------------------------------
    | AMQP properties separated by key
    |--------------------------------------------------------------------------
    */
    'properties' => [
        'production' => [
            'host'                  => 'localhost',
            'port'                  => 5672,
            'username'              => 'my_username',
            'password'              => 'my_password',
            'vhost'                 => '/',
            'connect_options'       => [],
            'ssl_options'           => [],
            'exchange'              => 'my_exchange',
            'exchange_type'         => 'direct',
            'exchange_passive'      => false,
            'exchange_durable'      => true,
            'exchange_auto_delete'  => false,
            'exchange_internal'     => false,
            'exchange_nowait'       => false,
            'exchange_properties'   => [],
            'queue_force_declare'   => false,
            'queue_passive'         => false,
            'queue_durable'         => true,
            'queue_exclusive'       => false,
            'queue_auto_delete'     => false,
            'queue_nowait'          => false,
            'queue_properties'      => ['x-ha-policy' => ['S', 'all']],
            'consumer_tag'          => '',
            'consumer_no_local'     => false,
            'consumer_no_ack'       => false,
            'consumer_exclusive'    => false,
            'consumer_nowait'       => false,
            'timeout'               => 0,
            'persistent'            => false,
        ],
    ],
];

Am I missing something? Since this didn't work, right now I'm following RabbitMQ's tutorial, using its receive.php as a base.

tpaksu commented 5 years ago

@gutierrezps are you using supervisor? Can you try the connect_options options I shared in my next post?

tpaksu commented 5 years ago

@gutierrezps AFAIK the code I posted here checks the queue once and if it has messages inside, it'll process them until the queue becomes empty. If your queue is empty, it's normal to see both messages printed at once. I'm trying to keep the consumer open with the keep-alive option, and if it times out, supervisor restarts it.

stevenklar commented 5 years ago

Persistent should be true because of https://github.com/bschmitt/laravel-amqp/blob/master/src/Consumer.php#L32.

In case you have config parameter "persistent" equals false and there are no messages, it just stops.

I recommend read the Consumer.php consume method. It will explain you how it works. :)

EDIT: Also don't think too complicated. You can also add an additional "while" to your code, so supervisor doesn't need to restart all the time when not necessary.

tpaksu commented 5 years ago

@stevenklar yes I've first coded like that, but I had doubts about while(true) regarding memory usage. But later I saw the while loops inside this package and php-amqplib package it uses then I started to think another way to do this more specific to amqp itself.

stevenklar commented 5 years ago

I assume this question is too old to keep open. @tpaksu and @marcstreeter please reopen if you want to continue the question/discussion.

renatosistemasvc commented 2 years ago

Hi! @tpaksu @stevenklar

I'm going to use this library in a software. I want to know if is recommended listening forever? do you have any performance issues on the php side?

My configuration is:

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Jobs\SubscribeCliente;
use Amqp;

class listeningSga extends Command
{
    protected $signature = 'listening:sga';

    protected $description = 'Command description';

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        Amqp::consume('cliente', function ($message, $resolver) {

            $resolver->acknowledge($message);

            //logic to handle the data...

        },[ 'exchange' => 'cliente', 'routing' => 'default' ]);

    }

}
'properties' => [

        'production' => [
            'host'                  => 'localhost',
            'port'                  => 5672,
            'username'              => 'guest',
            'password'              => 'guest',
            'vhost'                 => 'sga',
            'connect_options'       => [],
            'ssl_options'           => [],

            'exchange'              => 'amq.direct',
            'exchange_type'         => 'direct',
            'exchange_passive'      => false,
            'exchange_durable'      => true,
            'exchange_auto_delete'  => false,
            'exchange_internal'     => false,
            'exchange_nowait'       => false,
            'exchange_properties'   => [],

            'queue_force_declare'   => false,
            'queue_passive'         => false,
            'queue_durable'         => true,
            'queue_exclusive'       => false,
            'queue_auto_delete'     => false,
            'queue_nowait'          => false,
            'queue_properties'      => ['x-ha-policy' => ['S', 'all']],

            'consumer_tag'          => '',
            'consumer_no_local'     => false,
            'consumer_no_ack'       => false,
            'consumer_exclusive'    => false,
            'consumer_nowait'       => false,
            'consumer_properties'   => [],
            'timeout'               => 0,
            'persistent'            => true,
            'publish_timeout'       => 0, // Only applicable when a publish is marked as mandatory
            'qos'                   => false,
            'qos_prefetch_size'     => 0,
            'qos_prefetch_count'    => 1,
            'qos_a_global'          => false
        ],
],