mikemadisonweb / yii2-rabbitmq

RabbitMQ Extension for Yii2
MIT License
72 stars 31 forks source link

Freezing consumer after few minutes #60

Open yiiman-dev opened 2 years ago

yiiman-dev commented 2 years ago

Hi everyone

At first, thanks for this useful repository

I have problem on consumer connection

It not work after few minutes, no error, no any response in console

I am using Supervisor on docker

My supervisor config:


[supervisord]
logfile=/dev/stdout
logfile_maxbytes=0
logfile_backups=0
loglevel=info
nodaemon=true

[program:php-fpm]
command=php-fpm -F -y /usr/local/etc/php-fpm.conf -c /usr/local/etc/php/conf.d/local.ini -R
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr

[program:consume]
command=php /home/www/app/yii rabbitmq/consume shahkar
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr

My rabbit config:


 [
    //Config help:: https://github.com/mikemadisonweb/yii2-rabbitmq
    'class'             => \mikemadisonweb\rabbitmq\Configuration::class,
    'auto_declare'      => true,
    'connections'       =>
        [
            [
                'name'     => 'rabbit',
                // You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html
                'host'     => ENV::get('RABBIT_HOST'),
                'port'     => ENV::get('RABBIT_PORT'),
                'user'     => ENV::get('RABBIT_USER'),
                'password' => ENV::get('RABBIT_PASSWORD'),
                'vhost'    => ENV::get('RABBIT_VHOST'),

                'type'                => AMQPLazyConnection::class,
                'url'                 => null,
                'connection_timeout'  => 3,
                'read_write_timeout'  => 3,
                'ssl_context'         => null,
                'keepalive'           => false,
                'heartbeat'           => 0,
                'channel_rpc_timeout' => 0.0
            ]
            // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks
        ],
    'exchanges'         =>
        [
            [
                'name'        => 'd_exchange',
                'type'        => 'direct',
                'passive'     => false,
                'durable'     => true,
                'auto_delete' => false,
                'internal'    => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ],
        ],
    'queues'            =>
        [
            [
                'name'        => 'sms',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
                // Queue can be configured here the way you want it:
                //'durable' => true,
                //'auto_delete' => false,
            ],
            [
                'name'        => 'log',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ],
            [
                'name'        => 'nationalID',
                'passive'     => false,
                'durable'     => true,
                'exclusive'   => false,
                'auto_delete' => false,
                'nowait'      => false,
                'arguments'   => null,
                'ticket'      => null,
            ]
        ],
    'bindings'          =>
        [
            [
                'exchange'     => 'd_exchange',
                'queue'        => 'sms',
                'to_exchange'  => null,
                'routing_keys' => ['sms'],
            ],
            [
                'exchange'     => 'd_exchange',
                'queue'        => 'log',
                'to_exchange'  => null,
                'routing_keys' => ['log'],
            ],

        ],
    'producers'         =>
        [
            [
                'name'          => 'producer',
                'connection'    => 'rabbit',
                'safe'          => true,
                'content_type'  => 'text/plain',
                'delivery_mode' => 2,
                'serializer'    => 'serialize',
            ],

            [
                'name'          => 'log',
                'connection'    => 'rabbit',
                'safe'          => true,
                'content_type'  => 'text/plain',
                'delivery_mode' => 2,
                'serializer'    => 'serialize',
            ],

        ],
    'consumers'         =>
        [
            [
                'name'                   => 'shahkar',
                // Every consumer should define one or more callbacks for corresponding queues
                'connection'             => 'rabbit',
                'qos'                    => [
                    'prefetch_size'  => 0,
                    'prefetch_count' => 0,
                    'global'         => false,
                ],
                'idle_timeout'           => 0,
                'idle_timeout_exit_code' => null,
                'proceed_on_exception'   => false,
                'deserializer'           => 'unserialize',
                'callbacks'              =>
                    [
                        // queue name => callback class name
                        'sms' => ShahkarConsumer::class,
                    ],
            ],
        ],
    'on before_consume' => function ($event) {
        if (\Yii::$app->has('db') && \Yii::$app->db->isActive) {
            try {
                \Yii::$app->db->createCommand('SELECT 1')->query();
            } catch (\yii\db\Exception $exception) {
                \Yii::$app->db->close();
            }
        }
    },
    'logger'            => [
        'log'           => true,
        'category'      => 'application',
        'print_console' => true,
        'system_memory' => true,
    ],
]

My consumer class:


namespace app\consumers;

use app\base\BaseConsumer;
use app\models\ModuleCorporates;
use app\models\ModuleTemp;
use Faker\Factory;
use Faker\Provider\fa_IR\Address;
use Faker\Provider\fa_IR\Person;
use Kavenegar\KavenegarApi;
use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class ShahkarConsumer extends BaseConsumer implements \mikemadisonweb\rabbitmq\components\ConsumerInterface
{

    /**
     * @inheritDoc
     */
    public function execute(AMQPMessage $msg)
    {
        $data = json_decode($msg->body);

        /**
         * Data format for `request_corporate_details` is : Object
         *[
         *  'mode'                  => 'request_corporate_details',
         *  'corporate_national_id' => '.........STRING..........',
         *  'ceo_nation_code'       => '.........STRING..........',
         *  'ceo_uid'               => '..........INT.............'
         *]
         */
        try {
            $faker = Factory::create();
            $person = new Person;
            $model = new ModuleTemp();
            $fields =
                [
                    'name'                     => $person->name(),
                    'operating_license_number' => $faker->numberBetween(1111111, 9999999),
                    'year_of_operation'        => $faker->numberBetween(1350, 1100),
                    'description'              => $faker->text(255),
                    'address'                  => $faker->address,
                    'phone'                    => $faker->phoneNumber,
                    'economic_code'            => $faker->numberBetween(1111111111, 21399999999),
                    'legal_mode'               => $faker->numberBetween(1, 3),
                    'site'                     => $faker->url,
                    'field_id'                 => null,
                    'category_id'              => null,
                    'ceo_uid'                  => $data->uid,
                    'region_id'                => null,
                    'province_id'              => null,
                    'city_id'                  => null,
                    'national_id'              => $data->national_id,
                    'ceo_name'                 => $faker->name,
                    'ceo_phone'                => $faker->phoneNumber,
                    'ceo_national_code'        => $person::nationalCode(),
                    'postal_code'              => Address::postcode(),
                    'production_plan'          => null,
                ];
            $model->key = 'corp_'.$data->ceo_uid;
            $model->value = json_encode($fields);
            $model->created_at = date('Y-m-d H:i:s');
            $model->age = 10800;
            $model->save();
        } catch (\Exception $e) {
            $this->logException($e);
            return ConsumerInterface::MSG_REQUEUE;
        }
        return ConsumerInterface::MSG_ACK;
    }
}

Top result in Linux after few minutes, when connection is freezed:

Mem: 7993764K used, 173456K free, 83884K shrd, 1878812K buff, 3713880K cached
CPU:   3% usr   1% sys   0% nic  95% idle   0% io   0% irq   0% sirq
Load average: 0.16 0.15 0.12 3/706 31
  PID  PPID USER     STAT   VSZ %VSZ CPU %CPU COMMAND
   21    17 www-data S     169m   2%   0   0% php-fpm: pool www
   22    17 www-data S     169m   2%   2   0% php-fpm: pool www
   17     7 root     S     169m   2%   2   0% php-fpm: master process (/usr/local/etc/php-fpm.conf)
    7     1 root     S     138m   2%   1   0% supervisord -c /home/www/.deploy/config/supervisor.conf
   16     7 root     S    47100   1%   1   0% php /home/www/app/yii rabbitmq/consume sms
   25     0 root     S     1676   0%   1   0% /bin/sh
    1     0 root     S     1608   0%   3   0% sh /entrypoint.sh
   31    25 root     R     1604   0%   3   0% top

After few minutes consumer is running but in fact it is freezed and dont work.

Any idea?

arlandantas commented 2 years ago

Hi @yiiman-dev . The same occurred with me.

I solved adding "heartbeat" and "read_write_timeout" fields in connection array with a number greater than 0 (I've used 60 for both).

michaelarnauts commented 7 months ago

Note that setting both heartbeat and read_write_timeout to the same value can cause a race condition where the connection is closed just before a heartbeat arrives.

It seems to be recommended to use 60 seconds as heartbeat and 130 seconds as read_write_timeout. This is actually the default of php-amqplib. See https://github.com/php-amqplib/php-amqplib/pull/648