arthurkushman / php-wss

Web-socket server/client with multi-process and parse templates support on server and send/receive options on client
MIT License
209 stars 32 forks source link

Push notifications to client #15

Closed clarkk closed 5 years ago

clarkk commented 6 years ago

How to push notifications from the server to the client without the client sends a message to the server first?

At a given point something happens on the server and the server needs to push the notification to the client?

Where can I put logic that check for updates in the database?

It doesn't seem to be possible.. As I see it the server can only reply on a messages from the client...?

clarkk commented 6 years ago

And possibility to set a sleep/delay on each loop where you put the logic so it won't check the database 10 times per sec :)

arthurkushman commented 6 years ago

The best place to put logic of sending to client before client was sent any message is in onOpen method in your back-end ServerHandler implementation (because client must connect to WebSocket server anyway). As what about possibility to set a sleep/delay, u can implement pre-processing logic in your application, e.g. by setting timestamp variable/property and check whether it`s been lapsed or not.

arthurkushman commented 6 years ago

Don't forget about ping/pong if that would be in demand.

clarkk commented 6 years ago

Another problem is to access all clients.. If you set MAX_CLIENTS_REMAINDER_FORK to a very low number and you connect alot of clients then you can't access $clients in a proper way. You will only have access to the clients that was connected at the point of forking a new process..

At the point where you fork a new process all variables and objects are copied.. So you will no longer be able to access clients from the child in the parent and vice versa

clarkk commented 6 years ago

You will not be able to access or see newcoming clients/connections after the point of forking across processes

clarkk commented 6 years ago

https://www.ibm.com/developerworks/library/os-php-shared-memory/index.html

might be a solution

clarkk commented 6 years ago

This issue is familiar with the issue you just fixed today :)

But now the problem is that the object/variable is copied at each fork

arthurkushman commented 6 years ago

Yes, shmops are great at using shared mem.

arthurkushman commented 6 years ago

To let developers use the sessions over socket client connections, I've made it possible to pass clients_per_fork_limit option to WebSocketServer class e.g.:

        $websocketServer = new WebSocketServer(new ServerHandler(), [
            'host'                   => '0.0.0.0',
            'port'                   => 8000,
            'clients_per_fork_limit' => 10500,
        ]);

Thus, u don`t need to worry about recently created connections in different processes. In future, it would be better to add shmop functionality, to make it possible to exchange connections through shared memory, but for now this should be sufficient.

clarkk commented 6 years ago

And about the client push loop.. Wouldn't it be usefull with something like this

    public function run()
    {
        $errno = NULL;
        $errorMessage = '';

        $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errno, $errorMessage);
        if ($server === false) {
            die('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL);
        }

        $pid = pcntl_fork();

        if($pid) { // run eventLoop in parent
            @cli_set_process_title(self::PROC_TITLE);
            $this->eventLoop($server);
        }
        else{ // run push_loop in child
            @cli_set_process_title(self::PROC_TITLE);
            $this->handler->push_loop();
        }
    }

And then have your push_loop in the ServerHandler?

clarkk commented 6 years ago

Right now I use redis database server to handle all connections.. Its alot faster and simple than shmop

https://redis.io/commands

arthurkushman commented 6 years ago

That`s good (I'm using redis for 6 years now), while redis is much more versatile with it's options/functions etc, shmop written in C too and integrated in php-core with api functions, there were several tests where shmop/msg functions outrun Redis in speed.

arthurkushman commented 6 years ago

This is definitely good idea to let users be able to send

And about the client push loop.. Wouldn't it be usefull with something like this

public function run()
{
    $errno = NULL;
    $errorMessage = '';

    $server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errno, $errorMessage);
    if ($server === false) {
        die('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL);
    }

    $pid = pcntl_fork();

    if($pid) { // run eventLoop in parent
        @cli_set_process_title(self::PROC_TITLE);
        $this->eventLoop($server);
    }
    else{ // run push_loop in child
        @cli_set_process_title(self::PROC_TITLE);
        $this->handler->push_loop();
    }
}

And then have your push_loop in the ServerHandler?

I've read some articles how someone use this feature in production, but this is rare high-load feature as I may noticed, so particular examples would be perfect.

Also as a technical spec what kind of params should be provided to pushLoop method? As in other methods - ConnectionContract? Would it be sufficient? Why the best place to run it, is in child process - lightness? Will there be an issues with killing e.x.: sigkill this loop?

clarkk commented 6 years ago

As I have messed around with websockets the last couple of days pcntl_fork() should NEVER been used.. It just copy all variables at the point on fork and you will never have access to socket connections and other variables across the forked processes. You need to run EVERYTHING in the same process :)

If you want to make high-performance websocket server PHP is not the way to go.. It is not possible to make parallel processing because you can not share any resources (socket connections) across processes forked with pcntl_fork()

If you want high-performance websocket server with multiprocessing it is golang.. But that would also make things complicated when all your code base is made with PHP :)

But PHP have gained alot of performance enhancements the last couple of years an espacially in PHP 7.3

So the way I have implemented it is something like this

In WebSocketServer.php

    private function looping($server)
    {
        while (true) {
            ...

            //new client
            if (in_array($server, $readSocks, false)) {
                $this->acceptNewClient($server, $readSocks);
            }

            //message from existing client
            $this->messagesWorker($readSocks);

            //push iteration
            $this->handler->push_iteration();
        }
    }

And in ServerHandler (this part is not done yet with error handling etc)

class ServerHandler extends WebSocket {
    private $redis;
    private $hash_socket    = 'websocket:socket:';
    private $set_sockets    = 'websocket:socket:set';

    private $connections    = [];

    const REDIS_ENTRY_EXPIRE = 120;

    const PUSH_WAKEUP_TIMEOUT       = 20;
    const PUSH_ITERATION_TIMEOUT    = 5;

    const VERBOSE = true;

    private $time;
    private $is_wakeup      = true;

    public function __construct(){
        $this->time = time();

        try{
            $this->redis = new Redis;
            if(!$this->redis->connect('127.0.0.1')){
                echo "Failed!\n";
            }

            $this->redis->delete($this->set_sockets);
        }
        catch(RedisException $e){
            echo $e->getMessage()."\n";
        }
    }

    public function onOpen(ConnectionContract $conn){
        $socket_id      = $conn->getUniqueSocketId();

        $this->set_socket($socket_id, [
            'pid'   => getmypid()
        ]);

        $this->connections[$this->hash_socket.$socket_id] = $conn;

        if(self::VERBOSE){
            echo 'Connection opened, total clients: '.$this->redis->sCard($this->set_sockets).'('.$socket_id.')'."\n";
        }
    }

    public function onMessage(ConnectionContract $conn, $msg){
        $socket_id      = $conn->getUniqueSocketId();
        $json           = json_decode($msg, true);

        //  Disconnect if PHPSESSID is not set
        if(empty($json['sid'])){
            $conn->close();

            return;
        }
        else{
            $this->get_session($socket_id, $json['sid']);
        }

        $this->set_socket($socket_id, [
            'pid'               => $this->get_socket_pid($socket_id),
            'sid'               => $json['sid'] ?? '',
            'cid'               => $_SESSION['cid'] ?? '',
            'bid'               => $_SESSION['bid'] ?? '',
            'uid'               => $_SESSION['uid'] ?? '',
            'time_user_offset'  => $_SESSION['time_user_offset'] ?? 0,
            'notifications'     => empty($json['notifications']) ? '{}' : json_encode($json['notifications']),
            'token'             => $json['token'] ?? ''
        ]);

        if(self::VERBOSE){
            echo 'Received message ('.$socket_id.'):  '.$msg."\n";
        }

        /*$keys = $this->redis->hGetAll($this->hash_socket.$socket_id);
        echo "\tpid:\t".$keys['pid']."\n";
        echo "\tsid:\t".$keys['sid']."\n";
        echo "\tcid:\t".$keys['cid']."\n";
        echo "\tbid:\t".$keys['bid']."\n";
        echo "\ttoken:\t".$keys['token']."\n";*/

        if(self::VERBOSE){
            echo "\n";
        }
    }

    public function onClose(ConnectionContract $conn){
        $socket_id      = $conn->getUniqueSocketId();

        if(self::VERBOSE){
            echo 'Connection close ('.$socket_id.')'."\n\n";
        }

        $this->unset_socket($socket_id);
        $conn->close();
        unset($this->connections[$this->hash_socket.$socket_id]);
    }

    public function onError(ConnectionContract $conn, WebSocketException $ex){
        if(self::VERBOSE){
            echo 'Error occured: ' . $ex->printStack();
        }
    }

    public function onPing(ConnectionContract $conn, $msg){

    }

    public function onPong(ConnectionContract $conn, $msg){

    }

    public function push_iteration(){
        $send = [];

        $time_elapsed = time() - $this->time;

        //  Wait until sockets have reconnected
        if($this->is_wakeup){
            if(($time_elapsed / self::PUSH_WAKEUP_TIMEOUT) < 1){
                return;
            }

            $this->is_wakeup = false;
        }
        else{
            if(($time_elapsed / self::PUSH_ITERATION_TIMEOUT) < 1){
                return;
            }
        }

        $this->time = time();

        if(self::VERBOSE){
            echo "Push loop: ".date('m-d-Y H:i:s', $this->time)."\n";
        }

        $datapool_push      = $this->get_datapool_push();
        $datapool_touched   = [];

        $sockets            = $this->get_sockets($datapool_push);

        foreach($datapool_push as $block_id => $entries){
            foreach($entries as $entry){
                $datapool_touched[$entry['id']] = true;

                foreach($sockets as $socket_key => $socket){
                    if($socket['datapool'] && $socket['bid'] == $entry['block_id'] && $socket['token'] != $entry['ws_token']){
                        $sockets[$socket_key]['send_datapool'][] = [
                            'tbl'           => $entry['tbl'],
                            'tbl_id'        => $entry['tbl_id'],
                            'is_deleted'    => $entry['is_deleted']
                        ];
                    }
                }
            }
        }

        require_once Ini::get('path/class').'/Datapool.php';

        foreach($sockets as $socket_key => $socket){
            Env::update_time($socket['time_user_offset']);

            Env::set('cid', $socket['cid']);
            Env::set('bid', $socket['bid']);
            Env::set('uid', $socket['uid']);

            $send[$socket_key] = [];

            if($socket['send_datapool']){
                $put_tables = [];

                foreach($socket['send_datapool'] as $entry){
                    if($entry['is_deleted']){
                        if(isset($send[$socket_key]['pool']['del'][$entry['tbl']])){
                            $send[$socket_key]['pool']['del'][$entry['tbl']][] = $entry['tbl_id'];
                        }
                        else{
                            $send[$socket_key]['pool']['del'][$entry['tbl']] = [$entry['tbl_id']];
                        }
                    }
                    else{
                        if(isset($put_tables[$entry['tbl']])){
                            $put_tables[$entry['tbl']][] = $entry['tbl_id'];
                        }
                        else{
                            $put_tables[$entry['tbl']] = [$entry['tbl_id']];
                        }
                    }
                }

                foreach($put_tables as $table => $ids){
                    foreach((new Datapool)->get($table, $ids) as $id => $data){
                        $send[$socket_key]['pool']['put'][$table][$id] = $data;
                    }
                }
            }

            if($socket['notifications']){
                require_once Ini::get('path/class').'/Notification.php';
                if($notifications = (new Notification)->get($socket['notifications'])){
                    $send[$socket_key]['notifications'] = $notifications;

                    $this->redis->hSet($socket_key, 'notifications', json_encode($notifications));
                }
            }
        }

        $this->purge_datapool($datapool_touched);

        foreach($send as $socket_id => $socket_send){
            if($socket_send){
                if(self::VERBOSE){
                    echo "Push: $socket_id\n";
                    //print_r($socket_send);
                    if($socket_send['notifications']){
                        echo "\tnotifications:\n";
                        foreach($socket_send['notifications'] as $k => $v){
                            echo "\t\t$k = $v\n";
                        }
                    }
                }

                $this->connections[$this->hash_socket.$socket_id]->send(json_encode($socket_send));
            }
        }

        $this->purge_closed_connections($datapool_touched);

        if(self::VERBOSE){
            echo "\n";
        }
    }

    private function purge_closed_connections(){
        $sockets = $this->redis->sMembers($this->set_sockets);

        //  Purge set
        foreach($sockets as $key => $socket_key){
            if(!$this->redis->exists($socket_key)){
                $this->redis->sRem($this->set_sockets, $socket_key);
                unset($sockets[$key]);
            }
        }

        $diff = array_diff(array_keys($this->connections), $sockets);

        //  Purge connections
        foreach($diff as $socket_key){
            unset($this->connections[$socket_key]);
        }
    }

    private function purge_datapool(Array $datapool_touched){
        foreach(array_keys($datapool_touched) as $datapool_id){
            try{
                (new Data_delete)->exec('datapool_push', $datapool_id);
            }
            catch(User_error $e){}
        }
    }

    private function get_sockets(Array $datapool_push): Array{
        if(!$sockets = $this->redis->sMembers($this->set_sockets)){
            return [];
        }

        $list = [];
        foreach($sockets as $socket_key){
            $socket                     = $this->redis->hMGet($socket_key, ['cid','bid','uid','time_user_offset','notifications','token']);
            $socket['notifications']    = strlen($socket['notifications']) > 2 ? json_decode($socket['notifications'], true) : [];
            $socket['datapool']         = isset($datapool_push[$socket['bid']]);
            $socket['send_datapool']    = [];

            $list[(int)substr($socket_key, strrpos($socket_key, ':') + 1)] = $socket;
        }

        return $list;
    }

    private function get_datapool_push(): Array{
        $list = [];

        $Data = new Data_get;
        $Data->access_level(Data::LEVEL_SYSTEM);
        $result = $Data->exec('datapool_push', [
            'select' => [
                'id',
                'block_id',
                'ws_token',
                'tbl',
                'tbl_id',
                'is_deleted'
            ],
            'order' => [
                'time'
            ]
        ]);
        while($row = $result->fetch()){
            if(!isset($list[$row['block_id']])){
                $list[$row['block_id']] = [];
            }

            $list[$row['block_id']][] = $row;
        }

        return $list;
    }

    private function get_session(int $socket_id, string $session_id){
        session_decode($this->redis->get('PHPREDIS_SESSION:'.$session_id));
    }

    private function set_socket(int $socket_id, Array $values){
        $socket_key = $this->hash_socket.$socket_id;
        $this->redis->hMSet($socket_key, $values);
        $this->redis->setTimeout($socket_key, self::REDIS_ENTRY_EXPIRE);

        $this->redis->sAdd($this->set_sockets, $socket_key);
    }

    private function get_socket_pid(int $socket_id): string{
        return $this->redis->hExists($this->hash_socket.$socket_id, 'pid') ? $this->redis->hGet($this->hash_socket.$socket_id, 'pid') : (string)getmypid();
    }

    private function get_socket_value(int $socket_id, string $key): string{
        return $this->redis->hGet($this->hash_socket.$socket_id, $key);
    }

    private function unset_socket(int $socket_id){
        $socket_key = $this->hash_socket.$socket_id;
        $this->redis->delete($socket_key);
        $this->redis->sRem($this->set_sockets, $socket_key);
    }
}
clarkk commented 6 years ago

If you really want to use multi-processing with a websocket server coded with PHP you need to setup more than one instances of PHP..

One instance running and handling all connections and one handling all the logic.. The instances should then could communicate via the socket connection (now its already complicated) where the instance handling the logic can fetch all current connections - compute some data to push, and then send back the data you need to push to each connection/client :)

arthurkushman commented 6 years ago

I've already used nodejs - socketio and go - gorilla/websocket, but in real life there are needs to get up and running with php, if e.g.: LNMP is the only supported.

clarkk commented 6 years ago

I have no earlier experience with websockets, so can't comment on that..

I have worked with both nodejs and go and IMO nodejs is faster to put some code together but it is not suitable for big projects

Go have goroutines/channels and other nice stuff but takes more time to do the same.. It's more like coding in C/C++

clarkk commented 6 years ago

The best way to handle the PHP instances are via cronjobs.. You can setup a cronjob to execute every minute.. So if the websocket server hits the stream selection timeout it will restart shortly after.. Or if the server hits a fatal error it will restart again

And to do this I would prefer to keep the process name to php and not php-wss :) So I would prefer having the option to turn off the renaming of the process feature :)

Then you can monitor all cron jobs in the same manner and not having different methods to keep track of each cron job..

arthurkushman commented 5 years ago

https://github.com/arthurkushman/php-wss/releases/tag/1.4.2