vyuldashev / laravel-queue-rabbitmq

RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.
MIT License
1.93k stars 382 forks source link

Issue: cant get my messages come to my own RabbitMQJob class #574

Closed matiazar closed 1 year ago

matiazar commented 1 year ago

Im dispatching some job from Laravel to RabbitMQ but when I execute

php artisan rabbitmq:consume --once or php artisan queue:work --once

the worker is using default job from where it was dispatched... and not the one I created

this is my rabbitmq.php config file

<?php
/**
 * This is an example of queue connection configuration.
 * It will be merged into config/queue.php.
 * You need to set proper values in `.env`.
 */
return [

    'driver' => 'rabbitmq',
    'queue' => env('RABBITMQ_QUEUE', 'default'),
    'connection' => 'default',
    'hosts' => [
        [
            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),
            'user' => env('RABBITMQ_USER', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
            'vhost' => env('RABBITMQ_VHOST', '/'),
        ],
    ],
    'options' => [
        'exchange' => [
            'name' => env('RABBITMQ_EXCHANGE_NAME'),
            /* Determine if exchange should be created if it does not exist. */
            'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
            /* Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html */
            // 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
            'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'),
            'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
            'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
            'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
            'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
        ],
        'queue' => [
            /* Determine if queue should be created if it does not exist. */
            'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
            /* Determine if queue should be binded to the exchange created. */
            'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
            /* Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html */
            'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
            'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
            'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
            'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
            'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),

            'job' => App\Jobs\RabbitMQ\RabbitMQJob::class,
        ],
    ],

    /* Determine the number of seconds to sleep if there's an error communicating with rabbitmq
    * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
    */
    'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),
    /*
    * Optional SSL params if an SSL connection is used
    * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
    */
    'ssl_params' => [
        'ssl_on' => env('RABBITMQ_SSL', false),
        'cafile' => env('RABBITMQ_SSL_CAFILE', null),
        'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
        'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
        'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
        'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
    ],
    /*
     * Set to "horizon" if you wish to use Laravel Horizon.
     */
    'worker' => env('RABBITMQ_WORKER', 'default'),
];

Expected behavior

to message be captured by App\Jobs\RabbitMQ\RabbitMQJob::class

please. help. thanks!!

matiazar commented 1 year ago

I have reused previous commit without horizon and now is working.... I dont know what was...

matiazar commented 1 year ago

Again me... now when I run php artisan queue:work --once

its entering to the payload() function... but thats all... its not executing fire or handle function. what can be wrong?

matiazar commented 1 year ago

working!

Like it is a cli command I didnt notice to check laravel log.... the problem was I didnt update RABBITMQ_WORKER=horizon to RABBITMQ_WORKER=default after removing horizon

thanks!

matiazar commented 1 year ago

@vyuldashev can you check this?

I have made a class that check if message is from laravel or not so it select what Job "controller" use only with overwritting the payload function

<?php

namespace App\Jobs;

use Illuminate\Support\Str;
use Illuminate\Queue\Jobs\JobName;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;

use function PHPUnit\Framework\isJson;

class ProcessMessageJob extends RabbitMQJob
{

    private function isLaravelJob($data){
        if (isset($data['data']) && isset($data['data']['commandName']) && isset($data['data']['command'])) return true;
        return false;
    }

    private function isJson($string)
    {
        json_decode($string);
        return json_last_error() === JSON_ERROR_NONE;
    }

    /**
     * Get the decoded body of the job.
     *
     * @return array
     */
    public function payload(): array
    {
        $data = $this->getRawBody();
        if ($this->isJson($data)) $data = json_decode($data, true);

        if ($this->isLaravelJob($data)) return $data;      

        return [
            'uuid' => Str::uuid(),
            'displayName' => HandleMessage::class,
            'job'  => 'Illuminate\\Queue\\CallQueuedHandler@call',
            'data' => [
                'commandName' => HandleMessage::class,
                'command' => serialize(new HandleMessage($data)),
            ],
            'id' => Str::uuid(),
        ];
    }
}