walkor / webman

Probably the fastest PHP web framework in the world.
https://webman.workerman.net/
MIT License
2.2k stars 216 forks source link

想请教一下webman和MQTT之间的配合 #216

Closed meibao-real closed 3 years ago

meibao-real commented 3 years ago

新项目准备完全用webman写,是一个物联网项目,其中设备使用MQTT协议通讯,注意到workerman有配套的MQTT依赖:【https://github.com/walkor/mqtt】 现在业务结构是对外开放一个API接口,外部调用接口往指定设备进行发送数据,我现在的做法是在webman的控制器中对应的接口方法内创建Workerman\Mqtt\Client对象,然后进行publish发送对应主题的消息,发送完毕后使用close()方法关闭对象。

想请教一下作者,首先我这样的结构写法在设备很少的时候应该是没有问题的,但是如果设备数量数十万个同时发送数据,肯定会出问题的吧,所以我这样即用即连,用完关闭的方式肯定不妥当,最好的方式肯定是保持MQTT长连接,但是在webman中不知该怎么写合适,,尤其是对外开放接口用webman控制器方法对MQTT设备订阅的主题发送数据的合适写法

我相信肯定有物联网领域的人士在,或者准备采用webman,以前我用workerman直接上TCP连接不需要关系这些直接写业务发送数据就行了,现在整合MQTT属实有点弄不明白,虚心请教~~

walkor commented 3 years ago

发布 在app/functions.php 里写个函数,类似

function mqtt_publish($topic, $content) {
    static $client = false;
    if (!$mqtt) {
        $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
        $mqtt->connect();
    }
    $mqtt->publish($topic, $content);
}

在webman里类似这样将mqtt连接全局存储起来就常驻内存了,然后就可以复用连接publish数据了。

订阅 订阅虽然也可以在http服务进程里订阅,但是最好还是用webman自定义进程去订阅,这样http服务和订阅服务可以分开,结构上更合理。

创建订阅服务进程 process/Subscriber.php

<?php
namespace process;

class Subscriber
{
    public function onWorkerStart()
    {
        $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
        $mqtt->onConnect = function($mqtt) {
            $mqtt->subscribe('test');
        };
        $mqtt->onMessage = function($topic, $content){
            var_dump($topic, $content);
        };
        $mqtt->connect();
    }
}

添加自定义进程配置 config/process.php

return [
    // ... 其它进程配置省略

    // mqtt_subscriber 为进程名称
    'mqtt_subscriber' => [
        // 这里指定进程类,就是上面定义的Subscriber类
        'handler' => process\Subscriber::class,
        'count'   => 1, // 进程数
    ],
];
meibao-real commented 3 years ago

发布 在app/functions.php 里写个函数,类似

function mqtt_publish($topic, $content) {
    static $client = false;
    if (!$mqtt) {
        $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
        $mqtt->connect();
    }
    $mqtt->publish($topic, $content);
}

在webman里类似这样将mqtt连接全局存储起来就常驻内存了,然后就可以复用连接publish数据了。

订阅 订阅虽然也可以在http服务进程里订阅,但是最好还是用webman自定义进程去订阅,这样http服务和订阅服务可以分开,结构上更合理。

创建订阅服务进程 process/Subscriber.php

<?php
namespace process;

class Subscriber
{
    public function onWorkerStart()
    {
        $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
        $mqtt->onConnect = function($mqtt) {
            $mqtt->subscribe('test');
        };
        $mqtt->onMessage = function($topic, $content){
            var_dump($topic, $content);
        };
        $mqtt->connect();
    }
}

添加自定义进程配置 config/process.php

return [
    // ... 其它进程配置省略

    // mqtt_subscriber 为进程名称
    'mqtt_subscriber' => [
        // 这里指定进程类,就是上面定义的Subscriber类
        'handler' => process\Subscriber::class,
        'count'   => 1, // 进程数
    ],
];

感谢指点,现在MQTT发送数据有较大概率出现【No connection to broker】,以下是DEBUG日志信息:

-> Try to connect to mqtt://127.0.0.1:1883
-- Error: No connection to broker
Mqtt client: No connection to broker
-- Tcp connection established
-> Send CONNECT package client_id:workerman-mqtt-client-641544590 username:toubi password:toubi123 clean_session:1 protocol_name:MQTT protocol_level:4
<- Recv CONNACK package, MQTT connect success
-> Send PUBLISH package, topic:toubi/jue_86_CC_A8_88_51_92 content:{"msg":"on","timestamp":1627536126,"nonce_str":"cu19kwxgd0n1s1xkjmotwg0gfwq987ou","sign":"774fbde33fbbf9668bc50f1c0aba387a"} retain:0 qos:0 dup:0 message_id:
-> Send PINGREQ package
<- Recv PINGRESP package
-> Try to connect to mqtt://127.0.0.1:1883
-- Error: No connection to broker
Mqtt client: No connection to broker
-- Tcp connection established
-> Send CONNECT package client_id:workerman-mqtt-client-1781207303 username:toubi password:toubi123 clean_session:1 protocol_name:MQTT protocol_level:4
<- Recv CONNACK package, MQTT connect success
-> Send PUBLISH package, topic:toubi/jue_86_CC_A8_88_51_92 content:{"msg":"on","timestamp":1627536151,"nonce_str":"g7u9p2999c6kpgiloekctqnqr6t500ka","sign":"74cf81a23ea8556f0b1c114e59bdb928"} retain:0 qos:0 dup:0 message_id:
-> Send PUBLISH package, topic:toubi/jue_86_CC_A8_88_51_92 content:{"msg":"on","timestamp":1627536151,"nonce_str":"4huy19xn7u4fmxwtt5kira6yz1ymbgk1","sign":"0e7ecf0e062fef5a2d7e573e64a1261a"} retain:0 qos:0 dup:0 message_id:
-> Send PUBLISH package, topic:toubi/jue_86_CC_A8_88_51_92 content:{"msg":"on","timestamp":1627536151,"nonce_str":"1vfjqdea4kso6bkit60246n0m3ofp9eh","sign":"19da631dd62541089e0b35661bd22058"} retain:0 qos:0 dup:0 message_id:
-> Send PUBLISH package, topic:toubi/jue_86_CC_A8_88_51_92 content:{"msg":"on","timestamp":1627536151,"nonce_str":"5bz9hiyd623nvfxq5qdosiaiwhdlhmyx","sign":"d5d56f3ec872d0100690e73950157e61"} retain:0 qos:0 dup:0 message_id:
-> Try to connect to mqtt://127.0.0.1:1883

是因为workerman/mqtt是异步客户端的原因造成的吗,所以我想了下改成了这样写,首先设置连接成功的回调,对已连接进行标记,然后用Workerman的Timer做个定时,每次发送通知前循环0.1秒检查是否连接,如果确实连接就把消息发布出去,具体代码是这样的:


function mqtt_publish($topic, $content) {
    static $mqtt = false;
    static $isConnected = false;
    if (!$mqtt) {
        $mqtt = new Workerman\Mqtt\Client('mqtt://127.0.0.1:1883',[
            'username'  =>  'toubi',
            'password'  =>  'toubi123',
            'debug'     =>  true
        ]);
        $mqtt->connect();
        $mqtt->onConnect = function ()use(&$isConnected){
            $isConnected = true;
        };
    }
    $timer = \Workerman\Lib\Timer::add(0.1,function () use ($isConnected, $mqtt, $topic, $content, &$timer){
       if ($isConnected) {
           $mqtt->publish($topic, $content);
           \Workerman\Lib\Timer::del($timer);
       }
    });
}```

目前这样写观察Debug输出信息暂无异常了,但是我这样会有什么问题吗?
meibao-real commented 3 years ago

上面我回复的,其实问题还是有的,在启动前期,如果没有建立满webman启动进程数量的MQTT客户端连接的话,是不会发送数据出去的,建立够了就正常了,,,这该如何是好,难道前期就先刷满足够数量的MQTT的连接嘛,,害,,

walkor commented 3 years ago

如果你想在webman进程启动时就连接mqtt,可以利用webman启动时自动运行机制,类似这样。

建立个bootstrap/Mqtt.php

  <?php
namespace bootstrap;

use Webman\Bootstrap;

class Mqtt implements Bootstrap
{
    protected static $mqtt = null;

    public static function start($worker) {
        $mqtt = new \Workerman\Mqtt\Client('mqtt://127.0.0.1:1883',[
            'username'  =>  'toubi',
            'password'  =>  'toubi123',
            'debug'     =>  true
        ]);
        $mqtt->connect();
        $mqtt->onConnect = function ($mqtt) {
            $mqtt->connected = true;
            if (!empty($mqtt->waitQueue)) {
                foreach ($mqtt->waitQueue as $item) {
                    $mqtt->publish($item[0], $item[1]);
                }
                $mqtt->waitQueue = [];
            }
        };
        static::$mqtt = $mqtt;
    }

    public static function publish($t, $m) {
        if (empty(static::$mqtt->connected)) {
            static::$mqtt->waitQueue[] = [$t, $m];
            return;
        }
        static::$mqtt->publish($t, $m);
    }
}

添加进程启动自动运行配置,config/bootstrap.php。进程启动后会自动运行这些类的start静态方法,用来做进程启动后的一些初始化。

return [
   // .... 这里省略了其它配置...
  bootstrap\Mqtt::class,
];

业务调用的时候直接运行

use bootstrap/Mqtt;
Mqtt::publish($topic, $message);
meibao-real commented 3 years ago

秒啊,webman距离全球推广又近了一步,咨询作者比自己瞎捉摸好使多了。我正准备捕获异常然后利用redis队列解决问题,看到给出的方案,豁然开朗。

moss0723 commented 10 months ago

请教一下,按照上面第二种静态类去实现, `public static function onWorkerStart() {

    $mqtt = new Client(config('mqtt_hz.host'), config('mqtt_hz.options'));
    $mqtt->onConnect = function ($mqtt) {
        $mqtt->subscribe(['registerUser' => 2,'uploadimg' => 2,'groupupload'=>2,'$SYS/broker/clients/active'=>2]);
    };
    $mqtt->onMessage = function ($topic,$content) {
        // log::info($content);
        $content = json_decode($content,true);
        switch ($topic) {
            case '$SYS/broker/clients/active':
                $api = new Api(
                    config('plugin.webman.push.app.api'),
                    config('plugin.webman.push.app.app_key'),
                    config('plugin.webman.push.app.app_secret')
                );

                $api->trigger('hangzhou','message',$content);
                break; 
        }

    };
    $mqtt->connect(); 
}`

在使用统计主题 $SYS/broker/clients/active后 在线数量会不准 需要如何修改