1107012776 / easy-swoole-rabbitmq

EasySwoole 框架的 RabbitMQ 队列插件,基于 php-amqplib/php-amqplib (The RabbitMQ queue plugin for the EasySwoole framework, based on php-amqplib/php-amqplib.)
Apache License 2.0
19 stars 5 forks source link

请问可以使用发布订阅模式吗?如何操作 #11

Open pengxingjiang opened 3 years ago

pengxingjiang commented 3 years ago

请问可以使用发布订阅模式吗?如何操作

1107012776 commented 3 years ago

消费者

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;

use EasySwoole\RabbitMq\MqQueue;
use EasySwoole\RabbitMq\MqJob;

class MqQueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function () {
            $MqQueue = MqQueue::getInstance()->refreshConnect();
            $MqQueue->consumer()->setConfig('kd_sms_fanout_send_ex','','fanout','hello2')->listen(function(MqJob $obj) {
                var_dump($obj->getJobData(),'receive hello2');
                \EasySwoole\EasySwoole\Logger::getInstance()->log('receive hello2' . var_export($obj, true),\EasySwoole\Log\LoggerInterface::LOG_LEVEL_INFO,'debug');

            });
        });
        go(function () {
            $MqQueue = MqQueue::getInstance()->refreshConnect();
            $MqQueue->consumer()->setConfig('kd_sms_fanout_send_ex','','fanout','hello1')->listen(function(MqJob $obj) {
                var_dump($obj->getJobData(),'receive hello1');
                \EasySwoole\EasySwoole\Logger::getInstance()->log('receive hello1' . var_export($obj, true),\EasySwoole\Log\LoggerInterface::LOG_LEVEL_INFO,'debug');

            });
        });
    }
}

生产者

        $MqQueue = MqQueue::getInstance()->refreshConnect();
        $job = new MqJob();
        $job->setJobData('composer hello word'.date('Y-m-d H:i:s', time()));
        $res = $MqQueue->producer()->setConfig('kd_sms_fanout_send_ex',"","fanout")->push($job);
        $MqQueue->closeConnection();