amphp / parallel

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.
MIT License
779 stars 64 forks source link

Continue tasks after exception. #144

Closed Ghostff closed 2 years ago

Ghostff commented 2 years ago

Just wondering if this was the intended behavior or ... , So when I enqueue two method and one of them throws an exception, it kills all the enqueued task that haven't been executed yet.

class Foo{
    public function a () {
        throw new \Exception('sdd');
    }

    public function b () {
        sleep(10); // when this is removed, file is created.
        touch(__DIR__ . '/b');
    }
}
$foo = new Foo();

$promises = [
    pool()->enqueue(new CallableTask([$foo, 'a'], [])),
    pool()->enqueue(new CallableTask([$foo, 'b'], [])),
];

$responses = \Amp\Promise\wait(\Amp\Promise\all($promises));

Is there a way to make sure that, all they tasks are executed without wrapping the method implementation in a try catch block?

hustlahusky commented 2 years ago

Hello. Try \Amp\Promise\any instead of \Amp\Promise\all. Like this:

[$exceptions, $responses] = yield \Amp\Promise\any($promises);
Ghostff commented 2 years ago
[$exceptions, $responses] = yield \Amp\Promise\any($promises);

with any, touch(__DIR__ . '/b'); is not evald when the exception in a() is removed.

hustlahusky commented 2 years ago

I've just checked your example and it works as intended with and without exception in Foo::a().

The script:

<?php

declare(strict_types=1);

namespace App;

use Amp\Parallel\Worker;
use function Amp\call;
use function Amp\Promise\any;
use function Amp\Promise\wait;

require __DIR__ . '/vendor/autoload.php';

Worker\factory(new Worker\BootstrapWorkerFactory(__FILE__));

class Foo
{
    public function a(): void
    {
        //throw new \RuntimeException('error');
    }

    public function b(): string
    {
        \sleep(10);
        \touch(__DIR__ . '/b');
        return 'foobar';
    }
}

if (\defined('AMP_WORKER')) {
    return;
}

[$exceptions, $values] = wait(call(static function () {
    $foo = new Foo();

    return any([
        Worker\pool()->enqueue(new Worker\CallableTask([$foo, 'a'], [])),
        Worker\pool()->enqueue(new Worker\CallableTask([$foo, 'b'], [])),
    ]);
}));

\var_dump($exceptions[0]?->getMessage());
\var_dump($values[1]);

Script output with exception:

warp@6bddf6229a2c:~$ php -f ._script.php 
string(169) "Uncaught RuntimeException in worker with message "error" and code "0"; use Amp\Parallel\Worker\TaskFailureException::getOriginalTrace() for the stack trace in the worker"
string(6) "foobar"
warp@6bddf6229a2c:~$ ls -la b
-rw-r--r--    1 warp     warp             0 Jun  4 09:46 b

Script output without exception:

warp@6bddf6229a2c:~$ php -f ._script.php 

Warning: Undefined array key 0 in /home/warp/._script.php on line 44
NULL
string(6) "foobar"
warp@6bddf6229a2c:~$ ls -la b
-rw-r--r--    1 warp     warp             0 Jun  4 09:47 b

File was deleted between the runs, and as you can see timestamps are different.