swoole / swoole-src

🚀 Coroutine-based concurrency library for PHP
https://www.swoole.com
Apache License 2.0
18.47k stars 3.16k forks source link

使用swoole websocket 客户端给swoole websocket server 发送消息,ws 服务端收到消息后,都会异常调用close方法 #5508

Closed LouisYuBin closed 1 month ago

LouisYuBin commented 1 month ago

使用swoole websocket 客户端给swoole websocket server 发送消息,在ws server 端message方法中显示发送消息的fd都是同一个,没有打开新连接。但是ws server端每次收到消息以后,日志显示都会调用一次 ws server配置的close方法,调用close方法时,fd都是一个未打开的fd。

server端代码:

<?php

namespace App\Actions\Xb\Im;

use ArrowWorker\Component\Cache\Redis;
use ArrowWorker\HttpServer\Request;
use ArrowWorker\Log\Log;
use App\Service\Encrypt\AesCbc;
use Swoole\WebSocket\Frame as WebSocketFrame;
use Swoole\WebSocket\Server as WebSocketServer;
use ArrowWorker\Component\Cache\Pool as Cache;

class WsMessage
{

    const SESSION_KEY = 'xb:im:session:%d';

    const SESSION_FD_MEMBER_KEY = 'xb:im:fd:%d';

    const SESSION_GROUP_KEY = 'xb:im:group';
    const ONLINE_COUNTER = 'xb:im:online:counter';

    public static function Open(WebSocketServer $server, int $fd, Request $request)
    {
        $uri      = $request->getUri();
        $clientIp = $request->getClientIp();
        $token    = $request->getHeader('token');

        Log::info(json_encode([
            'method' => __METHOD__,
            'uri'    => $uri,
            'fd'     => $fd,
            'token'  => $token,
            'ip'     => $clientIp,
            'user_agent' => $request->getUserAgent(),
        ]));

        // uri不在名单则直接关闭连接
        if (!in_array($uri, ['/xb/im', '/xb/msg'])) {
            $server->close($fd);
            return;
        }

        /**
         * @var $cacheIns Redis
         */
        $cacheIns = Cache::get();

        $sessionKey = sprintf(self::SESSION_KEY, $fd);

        //来自本地的连接直接接受(来自http请求从本地发来的请求)
        if (in_array($clientIp, ['127.0.0.1', 'localhost'])) {
            $cacheIns->hSet($sessionKey, 'is_super_client', '1');
            //$cacheIns->set(sprintf(self::SESSION_FD_MEMBER_KEY, 3), $fd);
            return;
        }

        if (empty($token)) {
            $server->close($fd);
            return;
        }

        $tokenJson = (new AesCbc())->decrypt($token);
        if (empty($tokenJson)) {
            $server->close($fd);
            return;
        }

        $tokenData = json_decode($tokenJson, true);
        if (!is_array($tokenData) || !isset($tokenData['member_id'])) {
            $server->close($fd);
            return;
        }

        $memberId = $tokenData['member_id'];

        $sessionFdMemberKey = sprintf(self::SESSION_FD_MEMBER_KEY, $memberId);
        $memberConnectedFds = $cacheIns->get($sessionFdMemberKey);
        $toBeSetFds = $fd;
        if ($memberConnectedFds !== false) {
            $toBeSetFds = $memberConnectedFds . ',' . $fd;
        }

        $hSetMemberIdResult = $cacheIns->hSet($sessionKey, 'member_id', $memberId);
        $setFdsResult       = $cacheIns->set($sessionFdMemberKey, $toBeSetFds);

        Log::info(json_encode([
            'method'               => __METHOD__,
            'uri'                  => $uri,
            'fd'                   => $fd,
            'token'                => $token,
            'ip'                   => $clientIp,
            'member_connected_fds' => $memberConnectedFds,
            'to_be_set_fds'        => $setFdsResult,
            'hSetMemberIdResult'   => $hSetMemberIdResult,
            'setFdsResult'         => $setFdsResult
        ]));
        if (false === $hSetMemberIdResult || false === $setFdsResult) {
            $server->close($fd);
            return;
        }

        $hSetLoginRet  = $cacheIns->hSet($sessionKey, 'login_time', time());
        $hSetClientRet = $cacheIns->hSet($sessionKey, 'is_super_client', '0');
        $expireResult  = $cacheIns->expire($sessionKey, 86400);
        $sAddRet       = $cacheIns->sAdd(self::SESSION_GROUP_KEY, $memberId);

        Log::info(json_encode([
            'method'         => __METHOD__,
            'member_id'      => $memberId,
            'fd'             => $fd,
            'set_login_ret'  => $hSetLoginRet,
            'set_client_ret' => $hSetClientRet,
            'expire_result'  => $expireResult,
            'add_ret'        => $sAddRet,
        ]));

    }

    public static function Message(WebSocketServer $server, WebSocketFrame $frame)
    {
        $fd                 = $frame->fd;
        $toFds              = '';
        $pushTargetFdResult = false;

        $responseMsgData = ['code' => 200, 'msg' => 'ok', 'data' => []];

        /**
         * @var $cacheIns Redis
         */
        $cacheIns   = Cache::get();
        $sessionKey = sprintf(self::SESSION_KEY, $fd);
        $cacheIns->hSet($sessionKey, 'last_msg_time', time());  //更新最后一次消息时间

        $timestamp   = time();
        $messageJson = $frame->data;
        Log::info("fd : {$fd}, data : " . $messageJson);
        $messageArray = json_decode($messageJson, true);
        if (!is_array($messageArray)) {
            $responseMsgData = [
                'code' => '206',
                'msg'  => 'message is not array',
            ];
            goto _RETURN;
        }

        if (!isset($messageArray['event'], $messageArray['event_type'], $messageArray['event_data'])) {  //参数不正确
            $responseMsgData = [
                'code' => '206',
                'msg'  => 'event / event_type / event_data is invalid',
            ];
            goto _RETURN;
        }

        if ($messageArray['event_type'] === 'heartbeat' && $messageArray['event'] === 'heartbeat') {
            goto HEART_BEAT_RETURN;
        }

        //给店主发消息
        if ($messageArray['event_type'] === 'im' && in_array($messageArray['event'], ['new_msg', 'read_msg'])) {
            if (!is_array($messageArray['event_data'])) {
                $responseMsgData = [
                    'code' => '206',
                    'msg'  => 'event_data is not array',
                ];
                goto _RETURN;
            }

            $shopMemberId = (int)($messageArray['event_data']['to_member_id'] ?? 0);
            if ($shopMemberId === 0) {
                $responseMsgData = [
                    'code' => '206',
                    'msg'  => 'event_data.to_member_id is not set',
                ];
                goto _RETURN;
            }

            $sessionFdMemberKey = sprintf(self::SESSION_FD_MEMBER_KEY, $shopMemberId);
            $toFds              = $cacheIns->get($sessionFdMemberKey);
            if (false === $toFds) {
                $responseMsgData = [
                    'code' => '404',
                    'msg'  => 'shop owner is offline',
                ];
                goto _RETURN;
            }

            $responseMsgData['data'] = $messageArray['event_data'];

        } else {
            $responseMsgData = [
                'code' => '201',
                'msg'  => 'event handler not found',
            ];
            goto _RETURN;
        }

        _RETURN:
        $responseMsgData['ts'] = $timestamp;
        $fdsList               = explode(',', $toFds);
        foreach ($fdsList as $toFd) {
            $toFd = (int)$toFd;
            if ($toFd <= 0) {
                continue;
            }
            try {
                if($server->exist($toFd)) {
                    $pushTargetFdResult = $server->push($toFd, $messageJson);
                    Log::info("from fd :{$fd}, to_fd : {$toFd} , data : {$messageJson}, result : ". (int)$pushTargetFdResult);
                }
            }
            catch (\Throwable $e) {
                Log::info(json_encode([
                    'method'             => __METHOD__,
                    'msg_push_error'     => $e->getMessage(),
                    'to_fd'              => $toFd,
                    'push_msg'           => $messageJson,
                ]));
                continue;
            }
        }

        HEART_BEAT_RETURN:
        $pushResult = $server->push($fd, json_encode($responseMsgData, JSON_UNESCAPED_UNICODE));

        Log::info(json_encode([
            'method'             => __METHOD__,
            'data'               => $frame->data,
            'response'           => $responseMsgData,
            'response_result'    => $pushResult,
            'target_push_result' => $pushTargetFdResult
        ]));

        return;
    }

    public static function Close(WebSocketServer $server, int $fd)
    {
        Log::info("fd : {$fd}, close");

        /**
         * @var $cacheIns Redis
         */
        $cacheIns = Cache::get();
        $sessionKey = sprintf(self::SESSION_KEY, $fd);

        $memberId = $cacheIns->hGet($sessionKey, 'member_id');
        if (false !== $memberId) {
            $sessionFdMemberKey = sprintf(self::SESSION_FD_MEMBER_KEY, $memberId);
            $fdsString          = $cacheIns->get($sessionFdMemberKey);
            $fdsList            = explode(',', $fdsString);
            $toBeSetFds         = [];
            foreach ($fdsList as $storeFd) {
                if ($storeFd != $fd) {
                    $toBeSetFds[] = $storeFd;
                }
            }
            if (empty($toBeSetFds)) {
                $cacheIns->del($sessionFdMemberKey);
            } else {
                $cacheIns->set($sessionFdMemberKey, implode(',', $toBeSetFds));
            }
            $cacheIns->sRem(self::SESSION_GROUP_KEY, $memberId);
        }

        Log::info(json_encode([
            'method'    => __METHOD__,
            'member_id' => $memberId,
            'fd'        => $fd,
        ]));

        $cacheIns->del($sessionKey);

    }

}

日志:

2024-10-11 12:03:17 | 24101112031730665496093 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Message","data":"{\"event_type\":\"im\",\"event\":\"new_msg\",\"event_data\":{\"to_member_id\":479,\"msg_type\":1,\"msg\":\"2\",\"chat_id\":7,\"id\":1023,\"role\":10,\"unread_msg_count\":2}}","response":{"code":200,"msg":"ok","data":{"to_member_id":479,"msg_type":1,"msg":"2","chat_id":7,"id":1023,"role":10,"unread_msg_count":2},"ts":1728619397},"response_result":true,"target_push_result":true}
2024-10-11 12:03:18 | 24101112031830665537363 | fd : 9, close
2024-10-11 12:03:18 | 24101112031830665525943 | fd : 3, data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"3","chat_id":7,"id":1024,"role":10,"unread_msg_count":3}}
2024-10-11 12:03:17 | 24101112031730665509520 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Close","member_id":false,"fd":8}
2024-10-11 12:03:18 | 24101112031830665537363 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Close","member_id":false,"fd":9}
2024-10-11 12:03:18 | 24101112031830665525943 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Message","data":"{\"event_type\":\"im\",\"event\":\"new_msg\",\"event_data\":{\"to_member_id\":479,\"msg_type\":1,\"msg\":\"3\",\"chat_id\":7,\"id\":1024,\"role\":10,\"unread_msg_count\":3}}","response":{"code":200,"msg":"ok","data":{"to_member_id":479,"msg_type":1,"msg":"3","chat_id":7,"id":1024,"role":10,"unread_msg_count":3},"ts":1728619398},"response_result":true,"target_push_result":true}
2024-10-11 12:03:19 | 24101112031930665552723 | fd : 3, data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"4","chat_id":7,"id":1025,"role":10,"unread_msg_count":1}}
2024-10-11 12:03:19 | 24101112031930665552723 | from fd :3, to_fd : 1 , data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"4","chat_id":7,"id":1025,"role":10,"unread_msg_count":1}}, result : 1
2024-10-11 12:03:19 | 24101112031930665552723 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Message","data":"{\"event_type\":\"im\",\"event\":\"new_msg\",\"event_data\":{\"to_member_id\":479,\"msg_type\":1,\"msg\":\"4\",\"chat_id\":7,\"id\":1025,\"role\":10,\"unread_msg_count\":1}}","response":{"code":200,"msg":"ok","data":{"to_member_id":479,"msg_type":1,"msg":"4","chat_id":7,"id":1025,"role":10,"unread_msg_count":1},"ts":1728619399},"response_result":true,"target_push_result":true}
2024-10-11 12:03:20 | 24101112032030665581360 | fd : 3, data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"5","chat_id":7,"id":1026,"role":10,"unread_msg_count":2}}
2024-10-11 12:03:20 | 24101112032030665581360 | from fd :3, to_fd : 1 , data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"5","chat_id":7,"id":1026,"role":10,"unread_msg_count":2}}, result : 1
2024-10-11 12:03:20 | 24101112032030665597201 | fd : 11, close
2024-10-11 12:03:19 | 24101112031930665566592 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Close","member_id":false,"fd":10}
2024-10-11 12:03:19 | 24101112031930665552723 | from fd :3, to_fd : 1 , data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"4","chat_id":7,"id":1025,"role":10,"unread_msg_count":1}}, result : 1
2024-10-11 12:03:19 | 24101112031930665566592 | fd : 10, close
2024-10-11 12:03:20 | 24101112032030665597201 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Close","member_id":false,"fd":11}
2024-10-11 12:03:20 | 24101112032030665581360 | from fd :3, to_fd : 1 , data : {"event_type":"im","event":"new_msg","event_data":{"to_member_id":479,"msg_type":1,"msg":"5","chat_id":7,"id":1026,"role":10,"unread_msg_count":2}}, result : 1
2024-10-11 12:03:20 | 24101112032030665581360 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Message","data":"{\"event_type\":\"im\",\"event\":\"new_msg\",\"event_data\":{\"to_member_id\":479,\"msg_type\":1,\"msg\":\"5\",\"chat_id\":7,\"id\":1026,\"role\":10,\"unread_msg_count\":2}}","response":{"code":200,"msg":"ok","data":{"to_member_id":479,"msg_type":1,"msg":"5","chat_id":7,"id":1026,"role":10,"unread_msg_count":2},"ts":1728619400},"response_result":true,"target_push_result":true}
2024-10-11 12:03:40 | 24101112034030665603850 | {"method":"App\\Actions\\Xb\\Im\\WsMessage::Message","data":"{\"event_type\":\"heartbeat\",\"event\":\"heartbeat\",\"event_data\":{}}","response":{"code":200,"msg":"ok","data":[]},"response_result":true,"target_push_result":false}
2024-10-11 12:03:40 | 24101112034030665603850 | fd : 1, data : {"event_type":"heartbeat","event":"heartbeat","event_data":{}}

php 版本 8.1.12 swoole 版本 5.1.4

NathanFreeman commented 1 month ago

onClose事件有个$reactorId参数,判断一下是不是小于0,是的话就是服务端主动关闭,此时要看看是不是触发了哪里的if分支导致关闭。否则就是客户端主动关闭。

https://wiki.swoole.com/zh-cn/#/server/events?id=onclose