walkor / workerman

An asynchronous event driven PHP socket framework. Supports HTTP, Websocket, SSL and other custom protocols.
http://www.workerman.net
MIT License
11.03k stars 2.25k forks source link

change worker after onMessage event #1001

Closed xenion54 closed 5 months ago

xenion54 commented 5 months ago

@walkor Hi, is there a way to change worker process for connection? As i understand, now connection linked to one of workers and its done on connection established. I need change worker on the fly, is it technicaly possible? i will realy apprechiate you for answer.

walkor commented 5 months ago

In Linux systems, it is possible to pass descriptors between processes. In Workerman, the code looks something like this.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Connection\TcpConnection;
use Workerman\Events\EventInterface;
use Workerman\Worker;

$worker = new Worker('websocket://0.0.0.0:1234');
$count = 4;
$worker->count = $count;
$fd_array = [];
for ($i = 0; $i < $count; $i++) {
    socket_create_pair(AF_UNIX, SOCK_DGRAM, 0, $fds);
    $fd_array[] = $fds;
}

$worker->onWorkerStart = function ($worker) use ($fd_array) {
    $id = $worker->id;
    $stream = socket_export_stream($fd_array[$id][0]);
    Worker::$globalEvent->add($stream, EventInterface::EV_READ, function () use ($stream, $worker) {
        echo "Receive connection, current_pid=" . posix_getpid() . " current_worker->id=$worker->id\n";
        $socket = socket_import_stream($stream);
        $data = [
            'controllen' => socket_cmsg_space(SOL_SOCKET, SCM_RIGHTS, 1)
        ];
        socket_recvmsg($socket, $data, 0);
        $socket = socket_export_stream($data['control'][0]['data'][0]);
        createConnectionForWorker($worker, $socket);
    });
};

$worker->onConnect = function (TcpConnection $connection) use ($fd_array, $worker, $count){
    $id = $worker->id;
    $toId = ($id + 1) % $count;
    echo "Send connection From pid=" . posix_getpid() . " worker->id=$id To worker->id=$toId\n";
    socket_sendmsg($fd_array[$toId][1], [
        'control' => [[
            'level' => SOL_SOCKET,
            'type'  => SCM_RIGHTS,
            'data'  => [$connection->getSocket()]
        ]]
    ], 0);
    $connection->close();
};

$worker->onMessage = function (TcpConnection $connection, $request) use ($worker) {
    echo "onMessage current_pid=" . posix_getpid() . " current_worker->id=$worker->id\n";
    $connection->send('ok');
};

function createConnectionForWorker($worker, $socket)
{
    $connection = new TcpConnection($socket);
    $worker->connections[$connection->id] = $connection;
    $connection->worker = $worker;
    $connection->protocol = $worker->protocol;
    $connection->transport = $worker->transport;
    $connection->onMessage = $worker->onMessage;
    $connection->onClose = $worker->onClose;
    $connection->onError = $worker->onError;
}

Worker::runAll();
xenion54 commented 5 months ago

Hi @walkor. Thank you so much. It looks hard, but its rly work. I m doing selfmade proxy, and when 2 connections with needed identifiers are connected, i need to put to one process, cause i think that sending message between each other will be faster in same process, than use Channel Class evry time when message received. If i wrong way, pls say what you think about it. Anyway, again, thank so much for answer and for nice library.

In Linux systems, it is possible to pass descriptors between processes. In Workerman, the code looks something like this.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Connection\TcpConnection;
use Workerman\Events\EventInterface;
use Workerman\Worker;

$worker = new Worker('websocket://0.0.0.0:1234');
$count = 4;
$worker->count = $count;
$fd_array = [];
for ($i = 0; $i < $count; $i++) {
    socket_create_pair(AF_UNIX, SOCK_DGRAM, 0, $fds);
    $fd_array[] = $fds;
}

$worker->onWorkerStart = function ($worker) use ($fd_array) {
    $id = $worker->id;
    $stream = socket_export_stream($fd_array[$id][0]);
    Worker::$globalEvent->add($stream, EventInterface::EV_READ, function () use ($stream, $worker) {
        echo "Receive connection, current_pid=" . posix_getpid() . " current_worker->id=$worker->id\n";
        $socket = socket_import_stream($stream);
        $data = [
            'controllen' => socket_cmsg_space(SOL_SOCKET, SCM_RIGHTS, 1)
        ];
        socket_recvmsg($socket, $data, 0);
        $socket = socket_export_stream($data['control'][0]['data'][0]);
        createConnectionForWorker($worker, $socket);
    });
};

$worker->onConnect = function (TcpConnection $connection) use ($fd_array, $worker, $count){
    $id = $worker->id;
    $toId = ($id + 1) % $count;
    echo "Send connection From pid=" . posix_getpid() . " worker->id=$id To worker->id=$toId\n";
    socket_sendmsg($fd_array[$toId][1], [
        'control' => [[
            'level' => SOL_SOCKET,
            'type'  => SCM_RIGHTS,
            'data'  => [$connection->getSocket()]
        ]]
    ], 0);
    $connection->close();
};

$worker->onMessage = function (TcpConnection $connection, $request) use ($worker) {
    echo "onMessage current_pid=" . posix_getpid() . " current_worker->id=$worker->id\n";
    $connection->send('ok');
};

function createConnectionForWorker($worker, $socket)
{
    $connection = new TcpConnection($socket);
    $worker->connections[$connection->id] = $connection;
    $connection->worker = $worker;
    $connection->protocol = $worker->protocol;
    $connection->transport = $worker->transport;
    $connection->onMessage = $worker->onMessage;
    $connection->onClose = $worker->onClose;
    $connection->onError = $worker->onError;
}

Worker::runAll();

Hi @walkor. Thank you so much. It looks hard, but its rly work. I m doing selfmade proxy, and when 2 connections with needed identifiers are connected, i need to put to one process, cause i think that sending message between each other will be faster in same process, than use Channel Class evry time when message received. If i wrong way, pls say what you think about it. Anyway, again, thank so much for answer and for nice library.

walkor commented 5 months ago

i think that sending message between each other will be faster in same process, than use Channel Class evry time when message received

Yes