swoole / swoole-src

🚀 Coroutine-based concurrency library for PHP
https://www.swoole.com
Apache License 2.0
18.44k stars 3.16k forks source link

swoole异步任务提示没有空闲的worker #4817

Closed hanwei88 closed 2 years ago

hanwei88 commented 2 years ago

WARNING Server::timer_callback() (ERRNO 9007): No idle task worker is available

<?php

defined('ENV_PATH') or define('ENV_PATH', __DIR__ . '/env');

defined('RUNTIME_PATH') or define('RUNTIME_PATH', __DIR__ . '/temp');

$daemon = false; // 是否守护进程
$action = '';
$pidFile = __DIR__ . '/manager_master';

foreach ($argv as $index => $item) {
    if (!$action && in_array($item, ['start', 'stop', 'restart', 'reload'])) {
        $action = $item;
    }

    if (!$daemon && $item === '-d') {
        $daemon = true;
    }

}

$pid = file_exists($pidFile) ? file_get_contents($pidFile) : 0;
if ($action == 'stop' || $action == 'restart') {

    if (!$pid) {
        echo "管理服务已经停止\n";
    }
    if ($pid && \Swoole\Process::kill($pid, 0)) {
        \Swoole\Process::kill($pid, SIGTERM);
    }
    echo "管理服务已经停止\n";
    if ($action == 'stop') {
        return;
    }

    while (\Swoole\Process::kill($pid, 0)) {

    }
}

//创建WebSocket Server对象,监听0.0.0.0:9502端口
$ws = new \Swoole\WebSocket\Server('0.0.0.0', 5688);
$ws->set(
    [
        'worker_num' => 1,
//        'enable_coroutine' => false,
        'task_worker_num' => 3,
        'daemonize' => $daemon,
    ]
);

$table = new Swoole\Table(2048);
$table->column('sass_app_id', Swoole\Table::TYPE_INT);
$table->column('time', Swoole\Table::TYPE_STRING, 64);
$table->create();
$ws->table = $table;

$ws->on('WorkerStart', function (\Swoole\Server $server, int $workerId) {

    if ($server->taskworker) {
        return;
    }

    if(!file_exists(RUNTIME_PATH)) {
        mkdir(RUNTIME_PATH, 0777, true);
    }

    if(!file_exists(ENV_PATH)) {
        mkdir(ENV_PATH, 0777, true);
    }

    \Swoole\Coroutine::create(
        function () use ($server) {
            while (true) {
                echo sprintf("%s: 检测服务\n", date('Y-m-d H:i:s'));
                if (!file_exists(ENV_PATH)) {
                    return;
                }

                $getFiles = getDirFiles(ENV_PATH);

                if (empty($getFiles)) {
                    return;
                }

                $apps = [];
                foreach ($getFiles as $file) {
                    if (!is_file($file) || $file == '.' || $file == '..' || strpos($file, '/.env') === false) {
                        continue;
                    }

                    $app = parse_ini_file($file, true);

                    if(!isset($app['env_dir']) || !isset($app['ws_port'])) {
                        continue;
                    }

                    if (isset($app['company_suffix']) && !isset($apps[$app['company_suffix']])) {
                        $app['sass_app_id'] = $app['env_dir'];
                        $apps[$app['company_suffix']] = $app;
                    }
                }

                if (empty($apps)) {
                    return;
                }

                foreach($apps as $app) {
                    $server->task(json_encode(['type' => 'check', 'data' => $app], JSON_UNESCAPED_UNICODE));
                     \Swoole\Coroutine::sleep(1);
                }

            }

        }
    );
});

$ws->on('Start', function (\Swoole\Server $server) use ($pidFile) {
    file_put_contents($pidFile, $server->master_pid);
});

//处理异步任务(此回调函数在task进程中执行)
$ws->on('Task', function ($serv, $task_id, $reactor_id, $data) {
    echo "New AsyncTask[id={$task_id}]".PHP_EOL;
    //返回任务执行的结果

    $msg = json_decode($data, true);

    if (empty($msg['type'])) {
        $serv->finish("{$task_id} -> OK");
        return;
    }

    $app = $msg['data'];

    if (!$serv->table->exist($app['sass_app_id'])) {
        $serv->table->set($app['sass_app_id'], ['sass_app_id' => $app['sass_app_id'], 'time' => 0]);
    }

    if (time() - $serv->table->get($app['sass_app_id'], 'time') < 5) {
        $serv->finish("{$app['sass_app_id']} -> OK");
        return;
    }

    $serv->table->set($app['sass_app_id'], ['sass_app_id' => $app['sass_app_id'], 'time' => time()]);
    // 检查服务是否正在运行
    $shell = "netstat -anp | grep ".$app['ws_port']." | grep LISTEN | grep tcp | wc -l";
    $result = shell_exec($shell);

    if ($result == 1) { // 服务未启动
        echo sprintf("检测时间: %s, %s: 服务正常运行\n", date('Y-m-d H:i:s', time()), $app['aliOss_system_name']);
        $serv->finish("{$app['sass_app_id']} -> OK");
        return;
    }

    $command = sprintf("php start_swoole.php restart -d --appId=%s", $app['sass_app_id']);
    exec($command);
    var_dump($command);
    echo sprintf("检测时间: %s, %s: 服务已启动\n", date('Y-m-d H:i:s', time()), $app['aliOss_system_name']);
    $serv->finish("{$app['sass_app_id']} -> OK");
});

//处理异步任务的结果(此回调函数在worker进程中执行)
$ws->on('Finish', function ($serv, $task_id, $data) {
    echo "AsyncTask[{$task_id}] Finish: {$data}".PHP_EOL;
});

//监听WebSocket连接打开事件
$ws->on('Open', function ($ws, $request) {
    $ws->push($request->fd, "hello, welcome\n");
});

//监听WebSocket消息事件
$ws->on('Message', function ($ws, $frame) {
    echo "Message: {$frame->data}\n";
    $ws->push($frame->fd, "server: {$frame->data}");
});

//监听WebSocket连接关闭事件
$ws->on('Close', function ($ws, $fd) {
    echo "client-{$fd} is closed\n";
});

function getDirFiles($dir, $level=0) {
    //首先先读取文件夹
    try {
        $temp = scandir($dir);
    } catch (\Exception $exception) {
        return [];
    }
    $level++;
    $files = [];
    //遍历文件夹
    foreach($temp as $v){
        $files[] = $a = $dir.'/'.$v;
        if(is_dir($a)){//如果是文件夹则执行
            if($v=='.' || $v=='..'){//判断是否为系统隐藏的文件.和..  如果是则跳过
                continue;
            }
            $files = array_merge($files, getDirFiles($a, $level)); //因为是文件夹所以再次调用 selectdir,把这个文件夹下的文件遍历出来
        }
    }
    return $files;
}

echo "服务已启动\n";
$ws->start();
Yurunsoft commented 2 years ago

字面意思,你投递的 task 多了,执行不过来。

发代码请用成对的3个```括起来。

matyhtf commented 2 years ago

只是提示你空闲进程不足,建议调大 task_worker_num 数量