swoft-cloud / swoft

🚀 PHP Microservice Full Coroutine Framework
https://swoft.org
Apache License 2.0
5.58k stars 786 forks source link

在命令行中开启自定义swoole服务器,会切换到异步连接池模式吗? #795

Closed goshja closed 5 years ago

goshja commented 5 years ago

请问在命令行中开启自定义swoole服务器,回调时会切换到异步连接池模式吗?谢谢

inhere commented 5 years ago

如何开启自定义服务器? 给个示例代码看看

kuoshijia commented 5 years ago

就是说使用swoft的Command来启动第三方vendor的RPC服务,onReceive回调后想要使用swoft的异步mysql/redis连接池、容器等功能。

是否只要继承Swoft\Bootstrap\Server\AbstractServer并注册onWorkerStart事件就可以了呢?

以下是微服务治理框架phptars/tars-server/src/core/Server.php启动的swoole http/tcp服务器代码(与AbstractServer差不多)

`<?php /**

namespace Tars\core;

use Monolog\Handler\StreamHandler; use Monolog\Logger; use Swoft\Rpc\Server\Rpc\RpcServer; use Tars\App; use Tars\Consts; use Tars\protocol\ProtocolFactory; use Tars\monitor\StatFServer; use Tars\monitor\PropertyFServer; use Tars\registry\QueryFWrapper; use Tars\registry\RouteTable; use Tars\report\ServerFWrapper; use Tars\config\ConfigWrapper; use Tars\monitor\cache\SwooleTableStoreCache;

class Server extends RpcServer

{ protected $tarsConfig; private $tarsServerConfig; private $tarsClientConfig;

protected $sw;
protected $masterPidFile;
protected $managerPidFile;

protected $application;
protected $serverName = '';
protected $protocolName = 'tars';

protected $workerNum = 4;

protected $setting;

protected $servicesInfo;
protected static $paramInfos;
protected $namespaceName;
protected $executeClass;

protected static $impl;
protected $protocol;
protected $timers;

protected $portObjNameMap = [];
protected $adapters = [];
protected $timerObjName = null;

public function __construct($conf)
{
    $this->tarsServerConfig = $conf['tars']['application']['server'];
    $this->tarsClientConfig = $conf['tars']['application']['client'];

    $this->servicesInfo = $this->tarsServerConfig['servicesInfo'];

    $this->tarsConfig = $conf;
    $this->application = $this->tarsServerConfig['app'];
    $this->serverName = $this->tarsServerConfig['server'];

    $this->setting = $this->tarsServerConfig['setting'];

    if (isset($this->tarsServerConfig['protocolName'])) {
        $this->protocolName = $this->tarsServerConfig['protocolName'];
    }

    $this->workerNum = $this->setting['worker_num'];
    $this->adapters = $this->tarsServerConfig['adapters'];
}

public function start()
{
    $interval = $this->tarsClientConfig['report-interval'];
    $statServantName = $this->tarsClientConfig['stat'];
    $locator = $this->tarsClientConfig['locator'];
    $moduleName = $this->application . '.' . $this->serverName;

    // 日志组件初始化 根据平台配置的level来
    $logLevel = $this->tarsServerConfig['loglevel'];

    $logger = new Logger("tars_logger");

    $levelMap = [
        'DEBUG' => Logger::DEBUG,
        'INFO' => Logger::INFO,
        'NOTICE' => Logger::NOTICE,
        'WARNING' => Logger::WARNING,
        'ERROR' => Logger::ERROR,
        'CRITICAL' => Logger::CRITICAL,
    ];

    $levelNameMap = [
        'DEBUG' => 'log_debug.log',
        'INFO' => 'log_info.log',
        'NOTICE' => 'log_notice.log',
        'WARNING' => 'log_warning.log',
        'ERROR' => 'log_error.log',
        'CRITICAL' => 'log_critical.log',
    ];
    $loggerLevel = $levelMap[$logLevel];
    $loggerName = $levelNameMap[$logLevel];

    $outStreamHandler = new StreamHandler(
        $this->setting['log_file'], $loggerLevel
    );

    $levelStreamHandler = new StreamHandler(
        $this->tarsServerConfig['logpath'] . $this->tarsServerConfig['app'] . '/' .
        $this->tarsServerConfig['server'] . '/' . $loggerName, $loggerLevel
    );

    $logger->pushHandler($outStreamHandler);
    $logger->pushHandler($levelStreamHandler);

    $logger->info("stat/property/keepalive/config/logger service init start...\n");
    // 初始化被调上报
    $statF = new StatFServer($locator, Consts::SWOOLE_SYNC_MODE, $statServantName, $moduleName, $interval);

    $monitorStoreClassName =
        isset($this->servicesInfo['monitorStoreConf']['className']) ?
            $this->servicesInfo['monitorStoreConf']['className'] :
            SwooleTableStoreCache::class;

    $monitorStoreConfig = isset($this->servicesInfo['monitorStoreConf']['config'])
        ? $this->servicesInfo['monitorStoreConf']['config'] : [];

    $registryStoreClassName = isset($this->servicesInfo['registryStoreConf']['className']) ? $this->servicesInfo['registryStoreConf']['className'] : RouteTable::class;
    $registryStoreConfig = isset($this->servicesInfo['registryStoreConf']['config']) ? $this->servicesInfo['registryStoreConf']['config'] : [];

    $monitorStoreCache = new $monitorStoreClassName($monitorStoreConfig);
    $statF->initStoreCache($monitorStoreCache);

    $registryStoreCache = new $registryStoreClassName($registryStoreConfig);
    QueryFWrapper::initStoreCache($registryStoreCache);

    //初始化特性上报
    $propertyF = new PropertyFServer($locator, Consts::SWOOLE_SYNC_MODE,
        $moduleName);

    // 初始化服务保活
    // 解析出node上报的配置 tars.tarsnode.ServerObj@tcp -h 127.0.0.1 -p 2345 -t 10000
    $result = \Tars\Utils::parseNodeInfo($this->tarsServerConfig['node']);
    $objName = $result['objName'];
    $host = $result['host'];
    $port = $result['port'];
    $serverF = new ServerFWrapper($host, $port, $objName);

    // 配置拉取初始化
    $configF = new ConfigWrapper($this->tarsClientConfig);

    // 初始化
    App::setTarsConfig($this->tarsConfig);
    App::setStatF($statF);
    App::setPropertyF($propertyF);
    App::setServerF($serverF);
    App::setConfigF($configF);
    App::setLogger($logger);

    $logger->info("stat/property/keepalive/config/logger service init finish...\n");

    foreach ($this->adapters as $key => $adapter) {
        $serviceInfo = $this->servicesInfo[$adapter['objName']];
        $ip = $adapter['listen']['sIp'];
        $port = $adapter['listen']['iPort'];
        $objName = $adapter['objName'];
        if (isset($serviceInfo['isTimer']) && $serviceInfo['isTimer']) {
            if ($this->timerObjName == null) {
                $this->timerObjName = $objName;
            } else {
                App::getLogger()->error(__METHOD__ . " only support one timer obj, check services.php");
            }
        }

        if ($key == 0) {
            switch ($serviceInfo['serverType']) {
                case 'http' :
                    $this->sw = new \swoole_http_server($ip, $port, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
                    $this->sw->on('Request', array($this, 'onRequest'));
                    $logger->info("$objName Server type http...\n");
                    break;
                case 'websocket' :
                    $this->sw = new \swoole_websocket_server($ip, $port, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
                    $this->sw->on('Request', array($this, 'onRequest'));
                    $this->sw->on('Message', array($this, 'onMessage'));
                    $logger->info("$objName Server type webSocket...\n");
                    break;
                case 'udp' :
                    $this->sw = new \swoole_server($ip, $port, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);
                    $logger->info("$objName Server type udp...\n");
                    break;
                default : //tcp
                    $this->sw = new \swoole_server($ip, $port, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
                    $logger->info("$objName Server type tcp...\n");
                    break;
            }
        } else {
            switch ($serviceInfo['serverType']) {
                case 'http' :
                    $portObj = $this->sw->addlistener($ip, $port, SWOOLE_SOCK_TCP);
                    $portObj->set(['open_websocket_protocol' => false, 'open_http_protocol' => true,]);
                    $portObj->on('Request', array($this, 'onRequest'));
                    $logger->info("$objName Server type http...\n");
                    break;
                case 'websocket' :
                    $portObj = $this->sw->addlistener($ip, $port, SWOOLE_SOCK_TCP);
                    $portObj->set(['open_websocket_protocol' => true, 'open_http_protocol' => false,]);
                    $portObj->on('Request', array($this, 'onRequest'));
                    $portObj->on('Message', array($this, 'onMessage'));
                    $logger->info("$objName Server type webSocket...\n");
                    break;
                case 'udp' :
                    $portObj = $this->sw->addlistener($ip, $port, SWOOLE_SOCK_UDP);
                    $portObj->set(['open_websocket_protocol' => false, 'open_http_protocol' => false,]);
                    $logger->info("$objName Server type udp...\n");
                    break;
                default : //tcp
                    $portObj = $this->sw->addlistener($ip, $port, SWOOLE_SOCK_TCP);
                    $portObj->set(['open_websocket_protocol' => false, 'open_http_protocol' => false,]);
                    $logger->info("$objName Server type tcp...\n");
                    break;
            }
        }

        $this->portObjNameMap[$port] = $objName;
    }

    // 判断是否是timer服务
    if ($this->timerObjName) {
        $logger->info("Server type timer...\n");

        $timerDir = $this->tarsServerConfig['basepath'] . 'src/timer/';

        if (is_dir($timerDir)) {
            $files = scandir($timerDir);
            foreach ($files as $f) {
                $fileName = $timerDir . $f;
                if (is_file($fileName) && strrchr($fileName, '.php') == '.php') {
                    $this->timers[] = $fileName;
                }
            }
        } else {
            $logger->error(__METHOD__ . ' Timer directory is missing\n');
        }
    }

    $this->sw->set($this->setting);

    $this->sw->on('Start', array($this, 'onMasterStart'));
    $this->sw->on('ManagerStart', array($this, 'onManagerStart'));
    $this->sw->on('WorkerStart', array($this, 'onWorkerStart'));
    $this->sw->on('Connect', array($this, 'onConnect'));
    $this->sw->on('Receive', array($this, 'onReceive'));
    $this->sw->on('Close', array($this, 'onClose'));
    $this->sw->on('WorkerStop', array($this, 'onWorkerStop'));

    $this->sw->on('Task', array($this, 'onTask'));
    $this->sw->on('Finish', array($this, 'onFinish'));
    App::setSwooleInstance($this->sw);

    $this->masterPidFile = $this->tarsServerConfig['datapath'] . '/master.pid';
    $this->managerPidFile = $this->tarsServerConfig['datapath'] . '/manager.pid';

    $this->protocol = ProtocolFactory::getProtocol($this->protocolName);

    require_once $this->tarsServerConfig['entrance'];

    $this->sw->start();
}

public function stop()
{
}

public function restart()
{
}

public function reload()
{
}

public function onConnect($server, $fd, $fromId)
{
}

public function onFinish($server, $taskId, $data)
{
}

public function onClose($server, $fd, $fromId)
{
}

public function onWorkerStop($server, $workerId)
{
}

public function onTimer($server, $interval)
{
}

public function onMasterStart($server)
{
    $this->_setProcessName($this->application . '.'
        . $this->serverName . ': master process');
    file_put_contents($this->masterPidFile, $server->master_pid);
    file_put_contents($this->managerPidFile, $server->manager_pid);

    // 初始化的一次上报
    TarsPlatform::keepaliveInit($this->tarsConfig, $server->master_pid);

    //拉取配置
    if (!empty($this->servicesInfo) &&
        isset($this->servicesInfo['saveTarsConfigFileDir']) &&
        isset($this->servicesInfo['saveTarsConfigFileName'])) {
        TarsPlatform::loadTarsConfig($this->tarsConfig,
            $this->servicesInfo['saveTarsConfigFileDir'],
            $this->servicesInfo['saveTarsConfigFileName']);
    }

}

public function onManagerStart()
{
    // rename manager process
    $this->_setProcessName($this->application . '.' . $this->serverName . ': manager process');
}

public function onWorkerStart($server, $workerId)
{
    foreach ($this->adapters as $adapter) {
        $objName = $adapter['objName'];

        switch ($this->servicesInfo[$objName]['serverType']) {
            case 'tcp' :
            case 'udp' :
                $className = $this->servicesInfo[$objName]['home-class'];
                self::$impl[$objName] = new $className();
                $interface = new \ReflectionClass($this->servicesInfo[$objName]['home-api']);
                $methods = $interface->getMethods();

                foreach ($methods as $method) {
                    $docBlock = $method->getDocComment();
                    // 对于注释也应该有自己的定义和解析的方式
                    self::$paramInfos[$objName][$method->name] = $this->protocol->parseAnnotation($docBlock);
                }
                break;
            case 'websocket' :
                $this->namespaceName[$objName] = $this->servicesInfo[$objName]['namespaceName'];
                $this->executeClass[$objName] = $this->servicesInfo[$objName]['home-class'];
                break;
            default : //http
                $this->namespaceName[$objName] = $this->servicesInfo[$objName]['namespaceName'];
                break;
        }
    }

    if ($workerId == 0) {
        // 将定时上报的任务投递到task worker 0,只需要投递一次
        $this->sw->task(
            [
                'application' => $this->application,
                'serverName' => $this->serverName,
                'masterPid' => $server->master_pid,
                'adapters' => array_column($this->tarsServerConfig['adapters'], 'adapterName'),
                'client' => $this->tarsClientConfig
            ], 0);
    }

    // task worker
    if ($workerId >= $this->workerNum) {
        $this->_setProcessName($this->application . '.' . $this->serverName . ': task worker process');
    } else {
        $this->_setProcessName($this->application . '.' . $this->serverName . ': event worker process');

        // 定时timer执行逻辑
        if (isset($this->timers[$workerId])) {
            $runnable = $this->timers[$workerId];
            require_once $runnable;
            $className = $this->namespaceName[$this->timerObjName] . 'timer\\' . basename($runnable, '.php');

            $obj = new $className();
            if (method_exists($obj, 'execute')) {
                swoole_timer_tick($obj->interval, function () use ($workerId, $runnable, $obj) {
                    try {
                        $funcName = 'execute';
                        $obj->$funcName();
                    } catch (\Exception $e) {
                        App::getLogger()->error(__METHOD__ . " Error in runnable: $runnable, worker id: $workerId, e: " . print_r($e,
                                true));
                    }
                });
            }
        }
    }
}

public function onTask($server, $taskId, $fromId, $data)
{
    switch ($taskId) {
        // 进行定时上报
        case 0:
            {
                $serverName = $data['serverName'];
                $application = $data['application'];

                \swoole_timer_tick(10000, function () use ($data, $serverName, $application) {

                    //获取当前存活的worker数目
                    $processName = $application . '.' . $serverName;
                    $cmd = "ps wwaux | grep '" . $processName . "' | grep 'event worker process' | grep -v grep  | awk '{ print $2}'";
                    exec($cmd, $ret);
                    $workerNum = count($ret);

                    if ($workerNum >= 1) {
                        TarsPlatform::keepaliveReport($data);
                    } //worker全挂,不上报存活 等tars重启
                    else {
                        App::getLogger()->error(__METHOD__ . " All workers are not alive any more.");
                    }
                });

                //主调定时上报
                $locator = $data['client']['locator'];
                $socketMode = Consts::SWOOLE_SYNC_MODE;
                $statServantName = $data['client']['stat'];
                $reportInterval = $data['client']['report-interval'];

                \swoole_timer_tick($reportInterval,
                    function () use ($locator, $socketMode, $statServantName, $serverName, $reportInterval) {
                        try {
                            $statF = App::getStatF();
                            $statF->sendStat();
                        } catch (\Exception $e) {
                            App::getLogger()->error((string)$e);
                        }
                    });

                // 基础特性上报
                \swoole_timer_tick($reportInterval,
                    function () use ($locator, $application, $serverName) {
                        try {
                            TarsPlatform::basePropertyMonitor($serverName);
                        } catch (\Exception $exception) {
                            App::getLogger()->error((string)$exception);
                        }
                    });
                break;
            }
        default:
            break;
    }
}

// 这里应该找到对应的解码协议类型,执行解码,并在收到逻辑处理回复后,进行编码和发送数据
public function onReceive($server, $fd, $fromId, $data)
{
    $resp = new Response();
    $resp->fd = $fd;
    $resp->fromFd = $fromId;
    $resp->server = $server;

    // 处理管理端口的特殊逻辑
    $unpackResult = \TUPAPI::decodeReqPacket($data);
    $sServantName = $unpackResult['sServantName'];
    $sFuncName = $unpackResult['sFuncName'];

    $objName = explode('.', $sServantName)[2];

    if (!isset(self::$paramInfos[$objName]) || !isset(self::$impl[$objName])) {
        App::getLogger()->error(__METHOD__ . " objName $objName not found.");
        $resp->send('');
        //TODO 这里好像可以直接返回一个taf error code 提示obj 不存在的
        return;
    }

    $req = new Request();
    $req->reqBuf = $data;
    $req->paramInfos = self::$paramInfos[$objName];
    $req->impl = self::$impl[$objName];
    // 把全局对象带入到请求中,在多个worker之间共享
    $req->server = $this->sw;

    // 处理管理端口相关的逻辑
    if ($sServantName === 'AdminObj') {
        TarsPlatform::processAdmin($this->tarsConfig, $unpackResult, $sFuncName, $resp, $this->sw->master_pid);
    }

    $event = new Event();
    $event->setProtocol(ProtocolFactory::getProtocol($this->servicesInfo[$objName]['protocolName']));
    $event->setBasePath($this->tarsServerConfig['basepath']);
    $event->setTarsConfig($this->tarsConfig);

    // 预先对impl和paramInfos进行处理,这样可以速度更快
    $event->onReceive($req, $resp);
}

/**
 * @param $request
 * @param $response
 * 针对http请求的响应
 */
public function onRequest($request, $response)
{
    $req = new Request();
    $req->data = get_object_vars($request);
    if (isset($req->data['zcookie'])) {
        $req->data['cookie'] = $req->data['zcookie'];
        unset($req->data['zcookie']);
    }
    if (empty($req->data['post'])) {
        $req->data['post'] = $request->rawContent();
    }

    $port = $req->data['server']['server_port'];
    if (!isset($this->portObjNameMap[$port])) {
        App::getLogger()->error(__METHOD__ . " failed. obj name with port $port not found ");
        return;
    }
    $objName = $this->portObjNameMap[$port];

    $req->servType = $this->servicesInfo[$objName]['serverType'];
    $req->namespaceName = $this->namespaceName[$objName];

    $req->server = $this->sw;

    $resp = new Response();
    $resp->servType = $this->servicesInfo[$objName]['serverType'];
    $resp->resource = $response;

    $event = new Event();
    $event->setProtocol(ProtocolFactory::getProtocol($this->servicesInfo[$objName]['protocolName']));
    $event->onRequest($req, $resp);

}

/**
 * @param $server
 * @param $frame
 * 增加websocket的回调
 */
public function onMessage($server, $frame)
{
    $info = $server->connection_info($frame->fd);
    $port = $info['server_port'];
    if (!isset($this->portObjNameMap[$port])) {
        App::getLogger()->error(__METHOD__ . " failed. obj name with port $port not found ");
        return;
    }
    $objName = $this->portObjNameMap[$port];

    $className = $this->executeClass[$objName];

    $class = new $className();
    $fun = "onMessage";
    $class->$fun($server, $frame);
}

/**
 * @param $name
 * 设置启动的进程的名称
 */
private function _setProcessName($name)
{
    if (function_exists('cli_set_process_title')) {
        cli_set_process_title($name);
    } elseif (function_exists('swoole_set_process_name')) {
        swoole_set_process_name($name);
    } else {
        App::getLogger()->error(__METHOD__ . ' failed. require cli_set_process_title or swoole_set_process_name.');
    }
}

} `

kuoshijia commented 5 years ago

是不是说我只要在onWorkerStart里调用一下BeanFactory::reload()就可以了?

inhere commented 5 years ago

1.0 的吗?

kuoshijia commented 5 years ago

1.0 的吗?

对,1.10版本

inhere commented 5 years ago

%>_<% 1.0 的就有点复杂了,我也不太确定。 建议使用 2.0 的,在协程环境里 swoft 功能都可以使用。

kuoshijia commented 5 years ago

本来是想要使用2.0的,不过TarsPHP团队反映swoft2 删了定时任务注解 tars依赖这个做keepalive 目前进度暂停 恩,既然在协程环境里 swoft 功能都可以使用就好。我之前是因为不确定swoft连接池是什么时候可以开始使用,查了网上说是最早在OnWorkerStart时调用BeanFactory::Reload()时实例化。 然而试验了一下,在没有reload之前也是可以调用实体查询数据库的,只是注解不能使用

总之,感谢如此迅速的回复~

------------------ 原始邮件 ------------------ 发件人: "Inhere"notifications@github.com; 发送时间: 2019年7月10日(星期三) 下午4:32 收件人: "swoft-cloud/swoft"swoft@noreply.github.com; 抄送: "MwaS"emailjia@qq.com;"Comment"comment@noreply.github.com; 主题: Re: [swoft-cloud/swoft] 在命令行中开启自定义swoole服务器,会切换到异步连接池模式吗? (#795)

%>_<% 1.0 的就有点复杂了,我也不太确定。 建议使用 2.0 的,在协程环境里 swoft 功能都可以使用。

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

stelin commented 5 years ago

@kuoshijia 建议用2.0,启动一个定时器就好了