swoole / swoole-src

🚀 Coroutine-based concurrency library for PHP
https://www.swoole.com
Apache License 2.0
18.47k stars 3.16k forks source link

[Question] After remove `\Swoole\Channel` is there any shared memory queue to use? #2527

Closed ihipop closed 5 years ago

ihipop commented 5 years ago

I know Swoole is focus on coroutine,But we need Process to utilize all CPU usage.

Swoole used to have \Swoole\Channel as a shared memory queue ,But removed at 4.3.0+

I want to write a process pool.

 \Swoole\Coroutine::create(function () {
            while (true) {
                $swooleChannel->push(posix_getpid());//Notify Master I'm idle
                $socket = $this->exportSocket();
                $this->handle($socket->recv());
            };
        });

To let the master Proces know that I'm idle, I used to push process ID to \Swoole\Channel So Master will know I'm idle (use $swooleChannel->pop()) ,it will write data to my socket.

But now, \Swoole\Channel is removed.

twose commented 5 years ago

Create socket by yourself.

ihipop commented 5 years ago

@twose I know,But Master process don't know which child process is idle Task must dispatch to the idle child process I tried to use \Swoole\Event::add() to wait child process socket is writable But failed.

<?php
$process = new \Swoole\Process(function (\Swoole\Process $workerProcess) {
    \Swoole\Coroutine::create(function () use($workerProcess){
        while (true) {
            $socket = $workerProcess->exportSocket();
            var_export($socket->recv());
        };
    });
}, false, SOCK_DGRAM, true);
$pid     = $process->start();
var_dump(\Swoole\Event::add($process->exportSocket(),function(){}, function ($socket) {
    var_export($socket);
    \Swoole\Event::del($socket);
    var_export($socket->send("hello proc1\n"));
}));
ihipop commented 5 years ago

This is another version I'v tried,but SplQueue Can not Cross process

<?php
$queue            = new SplQueue();
$processContainer = [];
for ($i = 0; $i < 2; $i++) {
    $process = new \Swoole\Process(function (\Swoole\Process $workerProcess) use ($queue) {
        \Swoole\Coroutine::create(function () use ($workerProcess, $queue) {
            while (true) {
                $socket = $workerProcess->exportSocket();
                $queue->enqueue($socket);//Tell master that I'm Idle
                echo date('Y-m-d H:i:s') . posix_getpid() . ': ' . $socket->recv() . PHP_EOL;
                sleep(1);
            };
        });
    }, false, SOCK_DGRAM, true);
    $pid     = $process->start();

    $queue->enqueue($process->exportSocket());
    $processContainer[] = $process;
}

$totalSockets = $queue->count();
for ($i = 0; $i <= $totalSockets + 1; $i++) {
    $socket = $queue->dequeue();
    var_export($socket);
    $socket->send('xxxxxxxx_' . $i);
    sleep(5);
}
echo '$queue Empty' . PHP_EOL;
//$processContainer[0]->exportSocket()->send('xxxxxxxx_'.'999');

\Swoole\Process::wait();
ihipop commented 5 years ago

I used to write another worked version,But it's wired to use a SubProcess as a queue and IT‘s Block

<?php
//$queue            = new SplQueue();
$queue = (new \Swoole\Process(function () {
}));
$queue->useQueue(crc32('xxxxxxfg'),2);

$processContainer = [];
for ($i = 0; $i < 2; $i++) {
    $process = new \Swoole\Process(function (\Swoole\Process $workerProcess) use ($queue) {
        \Swoole\Coroutine::create(function () use ($workerProcess, $queue) {
            while (true) {
                $socket = $workerProcess->exportSocket();
                $pid    = posix_getpid();
                echo 'Idle:' . $pid . PHP_EOL;
                $queue->push($pid);//Tell master that I'm Idle
                echo date('Y-m-d H:i:s') . ' @' . posix_getpid() . ': ' . $socket->recv() . PHP_EOL;
                sleep(5);
            };
        });
        \Swoole\Coroutine::create(function (){
            while (true){
                echo posix_getpid() . ': Coroutine Alive Not Block By Sleep :)' . PHP_EOL;
                \co::sleep(1);
            }
        });
        \Swoole\Timer::tick(1*1000,function (){
            echo posix_getpid() . ': Coroutine Alive Not Block  By Time Ticker :))' . PHP_EOL;
        });
    }, false, SOCK_DGRAM, true);
    $pid     = $process->start();

    $queue->push($pid);
    $processContainer[$pid] = $process;
}
//var_export($processContainer);
for ($i = 0; $i <= PHP_INT_MAX; $i++) {
    $pid = $queue->pop();
    if (in_array($pid, array_keys($processContainer))) {
        $socket = ($processContainer[$pid])->exportSocket();
        //        var_export($socket);
        $socket->send('xxxxxdddddddxxx_' . $i);
    } else {
        echo 'SKIP: ' . $pid . PHP_EOL;
    }
}
echo 'Done' . PHP_EOL;
//$processContainer[0]->exportSocket()->send('xxxxxxxx_'.'999');

\Swoole\Process::signal(SIGCHLD, function ($sig) {
    //todo remove this when quiting
    //必须为false,非阻塞模式
    while ($ret = \Swoole\Process::wait(false)) {
        var_export($ret);
        echo "PID={$ret['pid']}\n";
        $this->rebootProcess($ret);
    }
});
ihipop commented 5 years ago

Finally After a Dream A I Got the right Way, Totally Coroutine Safe Without Block In Communication Between Process

<?php
Swoole\Runtime::enableCoroutine();
$processContainer = [];
for ($i = 0; $i < 2; $i++) {
    $process                = new \Swoole\Process(function (\Swoole\Process $workerProcess) {
        \Swoole\Coroutine::create(function () use ($workerProcess) {
            while (true) {
                $socket = $workerProcess->exportSocket();
                $pid    = posix_getpid();
                echo 'Idle:' . $pid . PHP_EOL;
                $socket->send(serialize(['type' => 'idle']));//Tell master that I'm Idle
                $receive     = ($socket->recv());
                $receivedLen = strlen($receive);
                $_           = unserialize($receive);
                printf('%s @%s: Received %s,Type %s' . PHP_EOL, date('Y-m-d H:i:s'), posix_getpid(), $receivedLen, gettype($_));
                \co::sleep(5);
                $socket->send(serialize(['type' => 'return', 'payload' => $receive]));//Tell master that I'm Idle
            };
        });
        \Swoole\Coroutine::create(function () {
            while (true){
                echo posix_getpid() . ': Coroutine Alive Not Block By Sleep :)' . PHP_EOL;
                \co::sleep(1);
            }
        });
        \Swoole\Timer::tick(1*1000,function (){
                echo posix_getpid() . ': Coroutine Alive Not Block  By Time Ticker :))' . PHP_EOL;
        });
    }, false, SOCK_DGRAM, true);
    $pid                    = $process->start();
    $processContainer[$pid] = $process;
}
foreach ($processContainer as $pid => $process) {
    Swoole\Coroutine::create(function () use ($process) {
        $socket = $process->exportSocket();
        while (true) {
            $retOrgi = $socket->recv();
            $ret     = unserialize($retOrgi);
            if ('idle' === $ret['type'] ?? '') {
                $t = str_repeat('t', 65535 - 64 - 1);
                echo '-------' . strlen($t) . '-------' . PHP_EOL;
                $send = $socket->send(serialize($t));//MAX is 65481 b ?
                echo 'SENDED: ' . $send . PHP_EOL;
            } elseif ('return' === $ret['type'] ?? '') {
                //               echo 'Got Return:'.var_export($ret['payload'],true).PHP_EOL;
                echo 'Got Return' . PHP_EOL;
            } else {
                echo 'illegal data:' ./*$retOrgi.*/
                    PHP_EOL;
            }
        };
    });
}

echo 'Done' . PHP_EOL;
//$processContainer[0]->exportSocket()->send('xxxxxxxx_'.'999');

\Swoole\Process::signal(SIGCHLD, function ($sig) {
    //todo remove this when quiting
    //必须为false,非阻塞模式
    while ($ret = \Swoole\Process::wait(false)) {
        var_export($ret);
        echo "PID={$ret['pid']}\n";
        $this->rebootProcess($ret);
    }
});

https://gist.github.com/ihipop/7c20254b6700d85f64ed9dc5ec1a22fa