walkor / GatewayWorker

Distributed realtime messaging framework based on workerman.
MIT License
1.01k stars 294 forks source link

swow驱动下CPU负载极高 #106

Closed xuanyanwow closed 1 year ago

xuanyanwow commented 1 year ago

问题

注:希望得到作者大大的联系方式~

composer

{
    "require": {
        "workerman/workerman": "v5.0.0-beta.4"
    }
}

php 代码

declare(strict_types=1);

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Events\Swow;
use Workerman\Events\Select;

require_once __DIR__ . '/../vendor/autoload.php';

registerAddress();

function registerAddress()
{
    $address = '127.0.0.1:9999';
    foreach (['127.0.0.1:9764'] as $register_address) {
        $register_connection = new AsyncTcpConnection("text://{$register_address}");
        $register_connection->eventLoop = new Swow();
        // $register_connection->eventLoop = new Select();

        $register_connection->onConnect = function ($register_connection) {
        };
        $register_connection->onClose = function ($register_connection) {
        };
        $register_connection->onMessage = function () {
        };
        $register_connection->connect();
    }
}
while (1) {
    sleep(1);
}

测试 swow stream_poll_one 函数

测试服务端 read 监听

<?php

declare(strict_types=1);
use Swow\Coroutine;

require __DIR__ . '/vendor/autoload.php';

$address = '0.0.0.0';
$port = 9764;

// 创建一个 TCP/IP socket
$socket = stream_socket_server("tcp://{$address}:{$port}", $errno, $errstr);

if (! $socket) {
    exit("Error: {$errno} - {$errstr}");
}

echo "Server started on {$address}:{$port}\n";

$read = [$socket];
$write = null;
$except = null;
// 持续接收新的客户端连接
while (true) {
    // 使用 stream_select 等待客户端连接或数据
    if (stream_select($read, $write, $except, null) === false) {
        break;
    }
    // 如果是服务器 socket,则接受新的客户端连接并添加到列表中
    $new_client = stream_socket_accept($socket);
    if ($new_client !== false) {
        echo "New client connected\n";
        // 开启新的协程来处理
        handle_client($new_client);
    }
}

function handle_client($client)
{
    Coroutine::run(function () use ($client): void {
        while (true) {
            if (! is_resource($client)) {
                echo "Client resoure is not valid\n";
                break;
            }
            $rEvent = stream_poll_one($client, STREAM_POLLIN | STREAM_POLLHUP);

            if ($rEvent !== STREAM_POLLNONE) {
                // 读取客户端数据
                $data = fread($client, 1024);
                if ($data === false || strlen($data) === 0) {
                    // 如果客户端已关闭连接,则从列表中删除
                    fclose($client);
                    echo "Client disconnected\n";
                    break;
                }
                // 原样返回数据
                fwrite($client, $data);
            }
            if ($rEvent !== STREAM_POLLIN) {
                echo "Client STREAM_POLLIN\n";
                break;
            }
        }
    });
}

// 关闭服务器 socket
fclose($socket);

测试客户端 write 监听

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Events\Swow;

require_once __DIR__ . '/../vendor/autoload.php';

registerAddress();

function registerAddress()
{
    // 创建一个 tcp client
    $stream = stream_socket_client('tcp://127.0.0.1:9764', $errno, $errstr, 30);

    // 发送hello
    fwrite($stream, 'hello');

    $rEvent = stream_poll_one($stream, STREAM_POLLOUT  | STREAM_POLLHUP);
    if ($rEvent == STREAM_POLLOUT){
        // 模拟checkConnection
        // 扔到baseWrite或者baseRead
        while (true) {
            $rEvent = stream_poll_one($stream, STREAM_POLLIN  | STREAM_POLLHUP);
            if ($rEvent == STREAM_POLLIN){
                $data = fread($stream, 1024);
                var_dump($data);
            }
            msleep(10);
        }
    }
}
while (1) {
    sleep(1);
}
Player626 commented 1 year ago
31740 socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 12
31740 connect(12, {sa_family=AF_INET, sin_port=htons(9764), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
31740 mmap(NULL, 262144, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7fd4a909a000
31740 dup(12)                           = 13
31740 epoll_ctl(4, EPOLL_CTL_ADD, 13, {EPOLLIN, {u32=4294967295, u64=4294967295}}) = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9bfc) = 0
31740 ioctl(13, FIONBIO, [1])           = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9c0c) = -1 ENOENT (No such file or directory)
31740 epoll_ctl(4, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=8}}) = 0
31740 epoll_ctl(4, EPOLL_CTL_ADD, 10, {EPOLLIN, {u32=10, u64=10}}) = 0
31740 epoll_ctl(4, EPOLL_CTL_ADD, 12, {EPOLLOUT, {u32=12, u64=12}}) = 0
31740 epoll_ctl(4, EPOLL_CTL_ADD, 13, {EPOLLOUT|EPOLLRDHUP, {u32=13, u64=13}}) = 0
31740 epoll_wait(4, [{EPOLLOUT, {u32=12, u64=12}}, {EPOLLOUT, {u32=13, u64=13}}], 1024, 0) = 2
31740 getsockopt(12, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
31740 setsockopt(12, SOL_TCP, TCP_NODELAY, [1], 4) = 0
31740 close(13)                         = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9c3c) = -1 EBADF (Bad file descriptor)
31740 getpeername(12, {sa_family=AF_INET, sin_port=htons(9764), sin_addr=inet_addr("127.0.0.1")}, [128->16]) = 0
31740 mmap(NULL, 262144, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7fd4a905a000
31740 epoll_ctl(4, EPOLL_CTL_ADD, 12, {EPOLLIN, {u32=4294967295, u64=4294967295}}) = -1 EEXIST (File exists)
31740 epoll_ctl(4, EPOLL_CTL_DEL, 12, 0x7fd4a9099bfc) = 0
31740 ioctl(12, FIONBIO, [1])           = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 12, 0x7fd4a9099c0c) = -1 ENOENT (No such file or directory)
31740 dup(12)                           = 13
31740 epoll_ctl(4, EPOLL_CTL_ADD, 13, {EPOLLIN, {u32=4294967295, u64=4294967295}}) = -1 EEXIST (File exists)
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9bfc) = 0
31740 ioctl(13, FIONBIO, [1])           = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9c0c) = -1 ENOENT (No such file or directory)
31740 epoll_ctl(4, EPOLL_CTL_ADD, 12, {EPOLLIN|EPOLLRDHUP, {u32=12, u64=12}}) = 0
31740 epoll_ctl(4, EPOLL_CTL_ADD, 13, {EPOLLOUT|EPOLLRDHUP, {u32=13, u64=13}}) = 0
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 0) = 1
31740 close(13)                         = 0
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a90d9c3c) = -1 EBADF (Bad file descriptor)
31740 munmap(0x7fd4a909a000, 262144)    = 0
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 961) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 961) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 961) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 961) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 960) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 959) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 958) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 957) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 956) = 1
31740 epoll_ctl(4, EPOLL_CTL_DEL, 13, 0x7fd4a9116f00) = -1 EBADF (Bad file descriptor)
31740 epoll_wait(4, [{EPOLLOUT, {u32=13, u64=13}}], 1024, 954) = 1

close导致epoll_ctl执行失败 epoll_wait卡死