openswoole / ext-openswoole

Programmatic server for PHP with async IO, coroutines and fibers
https://openswoole.com
Apache License 2.0
808 stars 51 forks source link

Executing multiple processes through scheduler with blocking I/O #32

Closed pthreat closed 3 years ago

pthreat commented 3 years ago

Please answer these questions before submitting your issue. Thanks!

  1. What did you do? If possible, provide a simple script for reproducing the error.
$run = new Coroutine\Scheduler();

$channel = $this->conn->channel();
$tokenService = $this->tokenService;
$connAssignService = $this->serverAssignmentService;
$appManager = $this->applicationManager;

foreach($this->queues->toArray() as $queue){
    $run->add(static function() use ($queue, $channel, $tokenService, $connAssignService, $appManager){
        echo "1\n";
        $channel->queue_declare($queue->getName(), ...$queue->getOptions());

        $channel->basic_consume(
            $queue->getName(),
            '',
            false,
            true,
            false,
            false,
            /**
             * @param AMQPMessage $msg
             */
            static function($msg) use ($queue, $tokenService, $connAssignService){
                $msg = QueueMessage::fromAMQPMessage($msg);
                dump($msg->getEvent());
                /**
                 * Find the server where this message is coming from
                 */
                $server = $connAssignService->findServerById($msg->getServer());

                if(null === $server){
                    dump("Unknown server {$msg->getServer()}");
                    return;
                }

                /**
                 * Check if client is authenticated and token has not expired
                 */
                $token = $tokenService->findByToken($msg->getToken(), false);

                if(null === $token){
                    dump("No records found for token (or token expired): {$msg->getToken()}");
                    return;
                }

                /**
                 * Refresh token expiry time only if token is not a permanent token
                 */
                if(!$token->isPermaToken()) {
                    $tokenService->refresh($token);
                }

                /**
                 * If token is not found within server connections, create it
                 */
                if(!$connAssignService->findByToken($token)){

                    $connAssignService->create($server, $token, $msg->getConnectionID());

                }else{
                    /**
                     * If token IS found, update connection ID as it may change on reconnects
                     */
                    $connAssignService->updateConnectionId($token, $msg->getConnectionID());
                }

                $queue->run(
                    $msg,
                    $token, //Pass current token
                    $tokenService->findByUser($token->getUser()) //Pass along all available tokens
                );
            }
        );

        while ($channel->is_open()){
            $channel->wait();
        }
    });
}
$run->start();
  1. What did you expect to see?

I expected that all lambdas would be executed but only the first one gets executed and the other one never runs

  1. What did you see instead?

  2. What version of Open Swoole are you using (show your php --ri openswoole)?

Version => 4.7.1

  1. What is your machine environment used (show your uname -a & php -v & gcc -v) ?

Linux 998e65b5fd02 4.19.0-18-amd64 #1 SMP Debian 4.19.208-1 (2021-09-29) x86_64 GNU/Linux Using built-in specs. COLLECT_GCC=gcc COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/10/lto-wrapper OFFLOAD_TARGET_NAMES=nvptx-none:amdgcn-amdhsa:hsa OFFLOAD_TARGET_DEFAULT=1 Target: x86_64-linux-gnu Configured with: ../src/configure -v --with-pkgversion='Debian 10.2.1-6' --with-bugurl=file:///usr/share/doc/gcc-10/README.Bugs --enable-languages=c,ada,c++,go,brig,d,fortran,objc,obj-c++,m2 --prefix=/usr --with-gcc-major-version-only --program-suffix=-10 --program-prefix=x86_64-linux-gnu- --enable-shared --enable-linker-build-id --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --libdir=/usr/lib --enable-nls --enable-bootstrap --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --with-default-libstdcxx-abi=new --enable-gnu-unique-object --disable-vtable-verify --enable-plugin --enable-default-pie --with-system-zlib --enable-libphobos-checking=release --with-target-system-zlib=auto --enable-objc-gc=auto --enable-multiarch --disable-werror --with-arch-32=i686 --with-abi=m64 --with-multilib-list=m32,m64,mx32 --enable-multilib --with-tune=generic --enable-offload-targets=nvptx-none=/build/gcc-10-Km9U7s/gcc-10-10.2.1/debian/tmp-nvptx/usr,amdgcn-amdhsa=/build/gcc-10-Km9U7s/gcc-10-10.2.1/debian/tmp-gcn/usr,hsa --without-cuda-driver --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu --with-build-config=bootstrap-lto-lean --enable-link-mutex Thread model: posix Supported LTO compression algorithms: zlib zstd gcc version 10.2.1 20210110 (Debian 10.2.1-6) [1]- Done uname -a root@998e65b5fd02:/var/www/html# PHP 7.3.31 (cli) (built: Sep 28 2021 16:33:55) ( NTS ) Copyright (c) 1997-2018 The PHP Group Zend Engine v3.3.31, Copyright (c) 1998-2018 Zend Technologies

doubaokun commented 3 years ago

new Coroutine\Scheduler() is internal API, it is not recommended to be used in application layer.

It is recommended to use multiple processes to deal with your current application logics. You can find more process pool info at https://www.swoole.co.uk/docs/modules/swoole-process-pool and put your logics within the callback of $pool->on("WorkerStart"

pthreat commented 3 years ago

@doubaokun How would I attach multiple blocking lambdas to a single "workerstart" ?

This doesn't works:

<?php

$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    while(true){
        echo "Hello\n";
        sleep(2);
    }
});

$pool->on("WorkerStart", function ($pool, $workerId) {
    while(true){
        echo "World\n";
        sleep(3);
    }
});

$pool->start();
doubaokun commented 3 years ago

Only one $pool->on("WorkerStart" is required. A number (10) of callbacks will be started. You can try to print workerId to see it.

pthreat commented 3 years ago

I see, this is not what I need. I need something more along the lines of creating a series of processes and being able to manage them. I was thinking $pool would have something like ->addProcess(new Process() ...) and then with that I could do ->start() and all of the processes would start.

I guess what I'm trying to say is that I want to be able to fork and monitor.

doubaokun commented 3 years ago

You can pass an array of callback functions index by ID, then start one callback in one worker by checking the ID of callback functions and workerId to assign tasks to each worker.

pthreat commented 3 years ago

If you could provide a bare bones example I would appreciate it :)

doubaokun commented 3 years ago

Example is added: https://github.com/openswoole/swoole-src/commit/7fbf538b1c87517c90282fa359e2050db78927f3