ssi-anik / laravel-amqp

anik/amqp wrapper for Laravel-ish frameworks
https://packagist.org/packages/anik/laravel-amqp
MIT License
29 stars 4 forks source link

Channel connection is closed when consume #13

Open alesima opened 4 days ago

alesima commented 4 days ago

I'm getting "Channel connection is closed" when consuming some queue.

<?php

namespace App\Abstracts;

use Anik\Amqp\ConsumableMessage;
use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Qos\Qos;
use Anik\Amqp\Queues\Queue;
use Anik\Laravel\Amqp\Facades\Amqp;
use App\Traits\LoggerTrait;
use App\Utils\Utils;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Validator;

abstract class AbstractRabbitCommand extends Command
{
    use LoggerTrait;

    public function handle()
    {
        $this->configureQueues();
    }

    abstract protected function configureQueues();

    protected function consumeQueue(string $queueName, string $method, Exchange $exchange, string $routingKey = null, int $prefetchCount = 30)
    {
        $queue = new Queue($queueName);
        $qos = new Qos(0, $prefetchCount);

        try {
            Amqp::consume([$this, $method], $routingKey ?? $queueName, $exchange, $queue, $qos);
        } catch (\Exception $e) {
            Log::error($queueName, [
                'message' => $e->getMessage(),
                'queue' => $queueName,
            ]);

            $this->error(sprintf("ERROR_CONSUME_QUEUE[%s]: error[%s]", $queueName, $e->getMessage()));
        }
    }

    protected function processMessage(ConsumableMessage $message, array $rules, callable $callback, string $queueName)
    {
        try {
            if (!Utils::checkIfIsJson($message->getMessageBody())) {
                throw new \InvalidArgumentException('Message is not a valid JSON');
            }

            $data = json_decode($message->getMessageBody(), true);
            $validated = Validator::make($data, $rules);

            if ($validated->fails()) {
                throw new \InvalidArgumentException($validated->errors()->toJson());
            }

            $callback($data);
        } catch (\Exception $e) {
            $this->logError($data ?? [], $e, debug_backtrace()[1]['function'], $queueName);
            $message->nack();
        }
    }

    protected function logError(array $data, \Exception $e, string $context, string $queueName)
    {
        Log::error($context, [
            'message' => $e->getMessage(),
            'queue' => $queueName,
            'data' => $data,
        ]);

        $this->error(sprintf("ERROR[%s]: %s", strtoupper($context), $e->getMessage()));
    }
}
<?php

namespace App\Console\Commands;

use Anik\Amqp\ConsumableMessage;
use Anik\Amqp\Exchanges\Exchange;
use Anik\Laravel\Amqp\Facades\Amqp;
use App\Abstracts\AbstractRabbitCommand;
use App\Helpers\Pubsub;
use App\Interfaces\Service\AlocacaoService;
use App\Interfaces\Service\SolicitacaoService;
use Carbon\Carbon;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitSolicitacaoLoopCommand extends AbstractRabbitCommand
{

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rabbit:solicitacao:loop:subscribe';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Subscribe to RabbitMQ to listen pending call driver and loop call driver';

    /**
     * @var \App\Interfaces\Service\AlocacaoService
     */
    private $alocacaoService;

    /**
     * @var \App\Interfaces\Service\SolicitacaoService
     */
    private $solicitacaoService;

    /**
     * @var \Anik\Amqp\Exchanges\Exchange
     */
    private $exchange;

    public function __construct(AlocacaoService $alocacaoService, SolicitacaoService $solicitacaoService)
    {
        parent::__construct();

        $this->alocacaoService = $alocacaoService;
        $this->solicitacaoService = $solicitacaoService;
    }

    protected function configureQueues()
    {
        $this->exchange = new Exchange('solicitation-loop', 'x-delayed-message');

        $this->consumeQueue('pending-call-driver', 'pendingCallDriver', $this->exchange);
        $this->consumeQueue('loop-call-driver', 'loopCallDriver', $this->exchange);
        $this->consumeQueue('restart-call-driver', 'restartCallDriver', $this->exchange);
    }

    public function pendingCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'solicitacao' => 'required|array',
            'expiresAt'   => 'required|numeric',
            'startedAt'   => 'required|numeric',
            'counter'     => 'required|numeric',
        ], function ($data) use ($message) {
            $this->info(sprintf("PENDING_CALL_DRIVER: solicitacao[%s]", $data['solicitacao']['uid_solicitacao']));
            $options = ['application_headers' => new AMQPTable(['x-delay' => 0])];
            Amqp::publish(json_encode($data), 'loop-call-driver', $this->exchange, $options);
            $message->ack();
        }, 'pending-call-driver');
    }

    public function loopCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'solicitacao' => 'required|array',
            'expiresAt'   => 'required|numeric',
            'startedAt'   => 'required|numeric',
            'counter'     => 'required|numeric',
        ], function ($data) use ($message) {
            $data['counter']++;
            $result = $this->alocacaoService->iniciarCall($data);

            if (!$result->isSuccess()) {
                throw new \Exception($result->getError()->toJson());
            }

            $status = $result->getData()['status'];
            $this->info(sprintf("CALL_DRIVER: loop[%d/%d] solicitacao[%s] status[%s]", $data['counter'], intval(env('MAX_CALLS')), $data['solicitacao']['uid_solicitacao'], $status));

            if ($status == 'CONTINUE_CALL' && $data['counter'] <= intval(env('MAX_CALLS', 7))) {
                $options = ['application_headers' => new AMQPTable(['x-delay' => 18000])];
                Amqp::publish(json_encode($data), 'loop-call-driver', $this->exchange, $options);
            }

            $message->ack();
        }, 'loop-call-driver');
    }

    public function restartCallDriver(ConsumableMessage $message)
    {
        $this->processMessage($message, [
            'uid'                         => 'required|uuid',
            'uid_motorista'               => 'required|uuid',
            'uid_passageiro'              => 'required|uuid',
            'distancia_estimada'          => 'required|numeric',
            'duracao_estimada'            => 'required|numeric',
            'endereco_partida'            => 'required|string',
            'lat_destino'                 => 'required|numeric',
            'lat_partida'                 => 'required|numeric',
            'endereco_destino'            => 'required|string',
            'lng_destino'                 => 'required|numeric',
            'lng_partida'                 => 'required|numeric',
            'ponto_referencia'            => 'nullable|string',
            'tipo_usuario'                => 'required|numeric',
            'tipo_corrida'                => 'required|numeric',
            'tipo_pagamento'              => 'required|numeric',
            'uid_servicos_especializados' => 'nullable|string',
            'favoritos'                   => 'nullable|array',
            'primeira_corrida'            => 'required|boolean',
            'sexo'                        => 'required|string',
            'info_passageiro'             => 'required|array',
            'stripe_charge_id'            => 'nullable|string',
            'calcula_estudante'           => 'nullable|numeric',
            'promocode'                   => 'nullable|array',
        ], function ($data) use ($message) {
            $result = $this->solicitacaoService->reiniciar($data);

            if (!$result->isSuccess()) {
                $this->handleServiceError($data, $result->getError(), 'restart-call-driver');
                return;
            }

            ['status' => $status, 'data' => $solicitacao] = (array) $result->getData();
            $this->info(sprintf("RESTART_CALL_DRIVER: solicitacao[%s] status[%s]", $data['uid'], $status));

            if ($status != 'SUCCESS') {
                Pubsub::publish('stopCallDriver', json_encode(['uid' => $data['uid'], 'to' => $data['uid_passageiro']]));
                $message->ack();
                return;
            }

            Pubsub::publish('restartCallDriver', json_encode([
                'uid' => $solicitacao['uid_solicitacao'],
                'expiresIn' => intval(env('EXPIRES_IN', 300) * 1000),
                'origin' => ['lat' => $data['lat_partida'] ?? null, 'lng' => $data['lng_partida'] ?? null],
                'to' => $data['uid_passageiro'],
            ]));

            $payload = [
                'solicitacao' => $solicitacao,
                'expiresAt' => Carbon::now()->timestamp + intval(env('EXPIRES_IN', 300)),
                'startedAt' => Carbon::now()->timestamp,
                'counter' => 0,
            ];

            Amqp::publish(json_encode($payload), 'pending-call-driver', $this->exchange);
            $message->ack();
        }, 'restart-call-driver');
    }

    private function handleServiceError(array $data, $error, string $context)
    {
        Log::error($context, [
            'message' => $error->toJson(),
            'queue' => $context,
            'exchange' => $this->exchange->getName(),
            'data' => $data,
        ]);

        $this->error(sprintf("ERROR_%s: solicitacao[%s] error[%s]", strtoupper($context), $data['uid'], $error->toJson()));
        Pubsub::publish('stopCallDriver', json_encode(['uid' => $data['uid'], 'to' => $data['uid_passageiro']]));
    }
}

Any thoughts?

alesima commented 4 days ago

Here's my amqp.php config file

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$amqp = parse_url(env('CLOUDAMQP_URL'));

return [
    'default' => env('AMQP_CONNECTION', 'rabbitmq'),

    'connections' => [
        'rabbitmq' => [
            'connection' => [
                'class' => AMQPStreamConnection::class,
                'hosts' => [
                    [
                        'host' => $amqp['host'],
                        'port' => array_key_exists("port", $amqp) ? $amqp['port'] : 5672,
                        'user' => $amqp['user'],
                        'password' => $amqp['pass'],
                        'vhost' => ltrim($amqp['path'], '/') ?: '/',
                    ]
                ],
                'options' => [],
            ],

            'message' => [
                'content_type' => env('AMQP_MESSAGE_CONTENT_TYPE', 'text/plain'),
                'delivery_mode' => env('AMQP_MESSAGE_DELIVERY_MODE', AMQPMessage::DELIVERY_MODE_PERSISTENT),
                'content_encoding' => env('AMQP_MESSAGE_CONTENT_ENCODING', 'UTF-8'),
            ],

            'exchange' => [
                'name' => env('AMQP_EXCHANGE_NAME', 'amq.direct'),
                'declare' => env('AMQP_EXCHANGE_DECLARE', false),
                'type' => env('AMQP_EXCHANGE_TYPE', 'direct'),
                'passive' => env('AMQP_EXCHANGE_PASSIVE', false),
                'durable' => env('AMQP_EXCHANGE_DURABLE', true),
                'auto_delete' => env('AMQP_EXCHANGE_AUTO_DELETE', false),
                'internal' => env('AMQP_EXCHANGE_INTERNAL', false),
                'no_wait' => env('AMQP_EXCHANGE_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_EXCHANGE_TICKET'),
            ],

            'queue' => [
                'name' => env('AMQP_QUEUE_NAME', 'amqp.laravel.queue'),
                'declare' => env('AMQP_QUEUE_DECLARE', false),
                'passive' => env('AMQP_QUEUE_PASSIVE', false),
                'durable' => env('AMQP_QUEUE_DURABLE', true),
                'exclusive' => env('AMQP_QUEUE_EXCLUSIVE', false),
                'auto_delete' => env('AMQP_QUEUE_AUTO_DELETE', false),
                'no_wait' => env('AMQP_QUEUE_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_QUEUE_TICKET'),
            ],

            'consumer' => [
                'tag' => env('AMQP_CONSUMER_TAG', ''),
                'no_local' => env('AMQP_CONSUMER_NO_LOCAL', false),
                'no_ack' => env('AMQP_CONSUMER_NO_ACK', false),
                'exclusive' => env('AMQP_CONSUMER_EXCLUSIVE', false),
                'no_wait' => env('AMQP_CONSUMER_NOWAIT', false),
                'arguments' => [],
                'ticket' => env('AMQP_CONSUMER_TICKET'),
            ],

            'qos' => [
                'enabled' => env('AMQP_QOS_ENABLED', false),
                'prefetch_size' => env('AMQP_QOS_PREFETCH_SIZE', 0),
                'prefetch_count' => env('AMQP_QOS_PREFETCH_COUNT', 1),
                'global' => env('AMQP_QOS_GLOBAL', false),
            ],

            'publish' => [
                'mandatory' => false,
                'immediate' => false,
                'ticket' => null,
                'batch_count' => 500,
            ],

            'bind' => [
                'no_wait' => false,
                'arguments' => [],
                'ticket' => null,
            ],

            'consume' => [
                'allowed_methods' => null,
                'non_blocking' => false,
                'timeout' => 0,
            ],
        ],
    ],
];
ssi-anik commented 2 days ago

Hello @alesima I am extremely sorry for replying you late.

Can you check the solution from the SO and confirm if it works. https://stackoverflow.com/questions/8839094/why-do-my-rabbitmq-channels-keep-closing