WyriHaximus / reactphp-child-process-pool

MIT License
40 stars 9 forks source link

Message not sent to parent before closing child #38

Closed FGIKCM closed 5 years ago

FGIKCM commented 5 years ago

On my case, messages are not sent from child to parent. As if they were ignored, they are not sent, even at the end, when the connection close.

I understand that if my code is blocking, the message can't be send immediately, and must wait for $loop to do an iteration. But it should send the message-s, at least at the end, before shutting down. (I think it is the case in your code. So I must miss something stupid :-/ )

For example, this does not trigger a message interception on my parent:

final class ProcessJob implements ChildInterface {
    public static function create(Messenger $messenger, LoopInterface $loop) {
        $messenger->registerRpc('process', function (Payload $payload) use ($messenger, $loop) {
            $messenger->message(MessagesFactory::message(["Test"]));

            $stopAt = time() + mt_rand(1, 2);
            do {
                // Do nothing
            } while ($stopAt >= time());

            return React\Promise\resolve(['status' => 'ok']);
        });
    }
}

On my parent, I intercept messages like this:

$this->pool->on('message', function (Payload $payload) {
    echo "PARENT: message\n";
    var_dump($payload);
});

Am I missing something?

Here is the full example I use for my tests.

go.php (launcher)

<?php
error_reporting(E_ALL);
require_once __DIR__.'/../../../../vendor/autoload.php';
require_once 'ping.php';

$loop = \React\EventLoop\Factory::create();
$ping = new Ping($loop);
$ping->go();
$loop->run();
echo "Bye, bye\n";

ping.php (the parent)

<?php
declare(strict_types=1);
require_once 'pong.php';

use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory;
use WyriHaximus\React\ChildProcess\Pool\PoolInterface;
use WyriHaximus\React\ChildProcess\Pool\Factory\Flexible;
use WyriHaximus\React\ChildProcess\Pool\Options;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;

class Ping {
    /** @var LoopInterface      React Loop */
    protected $loop;

    /** @var  PoolInterface     Pool of process */
    protected $pool;

    /** @var TimerInterface     Periodic timer (proccess things each tick) */
    protected $timer;

    /** @var int                Just to limit number of operations in this example */
    protected $item;

    /**
     * @param LoopInterface $loop       React loop
     */
    public function __construct(LoopInterface $loop) {
        $this->loop = $loop;
        $this->pool = null;
        $this->timer = null;
        $this->item = 0;
    }

    /**
     * Doing something in sub-processes
     */
    public function go(): void {
        if ($this->pool !== null)
            throw new \Exception('Only one instance, please');

        // Create a pool
        $options = [
            Options::MIN_SIZE => 0,
            Options::MAX_SIZE => 2,
            Options::TTL      => 0,
        ];
        Flexible::createFromClass(ProcessJob::class, $this->loop, $options)
            ->then(
                function (PoolInterface $pool) {
                    $this->pool = $pool;
                    echo "PARENT: ready\n";

                    // Handle error reported by the child
                    $this->pool->on('error', function (\Error $e) {
                        echo "PARENT: error\n";
                        var_dump($e);
                    });

                    // Handle message reported by the child
                    $this->pool->on('message', function (Payload $payload) {
                        echo "PARENT: message\n";
                        var_dump($payload);
                    });

                    //Do things periodically
                    $this->timer = $this->loop->addPeriodicTimer(0.5, function() {
                        // Limit things to do (for this example)
                        $item = ++$this->item;
                        if ($item > 3) {
                            $this->loop->cancelTimer($this->timer);
                            $this->pool->terminate();
                            return;
                        }

                        // Process something long in sub-process
                        echo "PARENT: rpc(process:$item)\n";
                        $this->pool->rpc(
                            MessagesFactory::rpc('process', [
                                'job'   => ['foo' => 'bar'],    #serialize($job),
                                'item'  => $item
                            ])
                        )->then(
                            function (Payload $payload) use ($item) {
                                echo "PARENT: then(process:$item). Status = ".$payload['status']."\n";
                            }
                        );
                    });
                }
            );
    }
}

pong.php (the child)

<?php
declare(strict_types=1);
error_reporting(E_ALL);

use React\EventLoop\LoopInterface;
use WyriHaximus\React\ChildProcess\Messenger\ChildInterface;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory;

final class ProcessJob implements ChildInterface {

    public static function create(Messenger $messenger, LoopInterface $loop) {

        $messenger->registerRpc('process', function (Payload $payload) use ($messenger, $loop) {
            $item = $payload['item'];
            echo "CHILD: rpc:process:".$payload['item']."\n";
            $messenger->message(MessagesFactory::message(["Test (".$payload['item'].")"]));

            $stopAt = time() + mt_rand(1, 2);
            do {
                // Do nothing
            } while ($stopAt >= time());

            return React\Promise\resolve(['status' => 'ok']);
        });
    }
}

Result:

PARENT: ready
PARENT: rpc(process:1)
PARENT: rpc(process:2)
PARENT: rpc(process:3)
CHILD: rpc:process:1
CHILD: rpc:process:2
PARENT: then(process:1). Status = ok
PARENT: then(process:2). Status = ok
CHILD: rpc:process:3
PARENT: then(process:3). Status = ok
Bye, bye

For this example, I hacked your child-process.php (booo) to load pong.php:

require_once '../../../../servers/LPDSpooler/_test_/child-process/pong.php');
WyriHaximus commented 5 years ago

Hey there, the expected result you list won't exactly be displayed like that because the parent doesn't forward the STDOUT from the client to it's own STDOUT so CHILD: echo's shouldn't be visible. Also assuming you use composers autoloading you shouldn't have to include the classes your self, as discussed here, composer will take care of that. I'll throw a gist together later today for this example and fix any issues coming up for you.

FGIKCM commented 5 years ago

I use Eclipse as IDE, and launch the script as a "PHP client application": this echoes every processes output in it's console, so I catch everything (parent & child), it's pretty handy. And I agree for Composer, I fixed it in my project. It is just a dirty way to give you a working example :)

I just forgot to say that I develop on a Windows machine. But I just tested on a Debian, the result is the same: I don't have message sent. And the client output are hidden as you said. I even disabled them to be sure they won't interfere with the process, with no luck.

WyriHaximus commented 5 years ago

Well if Eclipse starts both processes that would example as the parent is supposed to start the child because it passes information to the child how to connect back to the parent for communication. If eclipse starts both it has no way to knowing where to connect to

FGIKCM commented 5 years ago

After some heavy debug, it looks like that:

(When adding debug code in /vendor/evenement/evenement/src/Evenement/EventEmitterTrait.php, I see that I have no listener for 'message' event)

But I asked to register for it:

Flexible::createFromClass(ProcessJob::class, $this->loop, $options)
    ->then(
        function (PoolInterface $pool) {
            $pool->on('message', function (Payload $payload) {
                echo "PARENT: message\n";
                var_dump($payload);
            });
    );

Must I really use $pool variable? Or must I use some Messenger object or something like that?

EDIT More info:

WyriHaximus commented 5 years ago

That is the way you listen for messages. But are you sure the client starts the child and not Eclipse?

FGIKCM commented 5 years ago

Yes, I tried in command line too, and the behavior is the same. To debug, I wrote events on disk, and saw that messages are really sent from child to parent. But as I said, the $pool (in fact: the Trait associated to the $pool) has no listeners associated to the message event. So the on(message) is not triggered on the parent side.

By the way, I am on PHP 7.3. I don't now if it makes any difference, but who knows

WyriHaximus commented 5 years ago

PHP version shouldn't affect this. Would have to look into it but it could be that the pool isn't forwarding the messages coming in from the messenger :zipper_mouth_face:

FGIKCM commented 5 years ago

Any news on this issue? (I didn't deploy my project, waiting nights and days in front of my computer, hopping you will find the solution :D)

I tried to investigate the code by myself, but my skills are not sufficient to set up the message forwarding from the pool :'(

WyriHaximus commented 5 years ago

Hey this totally slipped my mind, will have a look at it during my commute home today. As I understand it correctly you want to send something from a child process in the pool back to the parent? Does RPC fit your requirements instead?

FGIKCM commented 5 years ago

Yes, that's it. As the child takes some time to finish, I wanted to send back some messages at the end of each step (instead of sending the whole log at the end in the Payload)

WyriHaximus commented 5 years ago

Ok found the missing links and working on a solution

WyriHaximus commented 5 years ago

Related PR is up, once that one is merged and tagged I'll open the one for this repo: https://github.com/WyriHaximus/reactphp-child-process-messenger/pull/35

FGIKCM commented 5 years ago

Great, thanks!!

WyriHaximus commented 5 years ago

40 is up :tada:

WyriHaximus commented 5 years ago

1.6.0 has been released. Can you confirm for me that it works for you?

FGIKCM commented 5 years ago

Yes! It works like a charm 👌 Thanks a lot!

WyriHaximus commented 5 years ago

Awesome! Glad to hear that