amphp / parallel

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.
MIT License
779 stars 64 forks source link

Executing a total of "n" tasks with "m" tasks running in parallel at a time #151

Closed jigarius closed 2 years ago

jigarius commented 2 years ago

Context

Amp\Parallel\Worker\WorkerException: The worker crashed
#0 [internal function]: Amp\Parallel\Worker\TaskWorker->Amp\Parallel\Worker\{closure}()
#1 /app/vendor/amphp/amp/lib/Coroutine.php(115): Generator->throw(Object(TypeError))
#2 /app/vendor/amphp/amp/lib/Internal/ResolutionQueue.php(70): Amp\Coroutine->Amp\{closure}(Object(TypeError), NULL)
#3 /app/vendor/amphp/amp/lib/Failure.php(33): Amp\Internal\ResolutionQueue->__invoke(Object(TypeError), NULL)
#4 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Amp\Internal\ResolutionQueue))
#5 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure))
#6 /app/vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Parallel\Context\ContextException))
#7 /app/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Context\ContextException), NULL)
#8 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure))
#9 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure))
#10 /app/vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Parallel\Context\ContextException))
#11 /app/vendor/amphp/amp/lib/Failure.php(33): Amp\Coroutine->Amp\{closure}(Object(Amp\Parallel\Sync\ChannelException), NULL)
#12 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(143): Amp\Failure->onResolve(Object(Closure))
#13 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(177): Amp\Coroutine->resolve(Object(Amp\Failure))
#14 /app/vendor/amphp/amp/lib/Coroutine.php(137): Amp\Coroutine->fail(Object(Amp\Parallel\Sync\ChannelException))
#15 /app/vendor/amphp/amp/lib/Internal/Placeholder.php(149): Amp\Coroutine->Amp\{closure}(NULL, NULL)
#16 /app/vendor/amphp/amp/lib/Deferred.php(53): Amp\Promise@anonymous->resolve(NULL)
#17 /app/vendor/amphp/byte-stream/lib/ResourceInputStream.php(198): Amp\Deferred->resolve()
#18 /app/vendor/amphp/byte-stream/lib/ResourceInputStream.php(182): Amp\ByteStream\ResourceInputStream->free()
#19 /app/vendor/amphp/parallel/lib/Sync/ChannelledSocket.php(68): Amp\ByteStream\ResourceInputStream->close()
#20 /app/vendor/amphp/parallel/lib/Context/Process.php(398): Amp\Parallel\Sync\ChannelledSocket->close()
#21 /app/vendor/amphp/parallel/lib/Worker/Internal/WorkerProcess.php(57): Amp\Parallel\Context\Process->kill()
#22 /app/vendor/amphp/parallel/lib/Worker/TaskWorker.php(64): Amp\Parallel\Worker\Internal\WorkerProcess->kill()
#23 [internal function]: Amp\Parallel\Worker\TaskWorker::Amp\Parallel\Worker\{closure}()
#24 {main}

Questions

References

jigarius commented 2 years ago

I think I found a good read on another issue. I'll try out that solution and reopen this ticket if need be.

kelunik commented 2 years ago

If all you do in the task is running another child process, I'd recommend you to take a look at https://github.com/amphp/process instead of using amphp/parallel.

jigarius commented 2 years ago

@kelunik thanks a tonne for the suggestion. I read the docs of the amphp/process and it seems it can meet most of my requirements.

The only answer I can't find at the moment is whether with that library it'll be possible to control the number of processes that run at a time? I want my users to be able to choose the maximum number of child processes that run at a time - a number between 1 and 16. Will that be possible with that library? From what I've read so far, it seems that might be something that I'll have to implement on my own 🤔

Thanks again for the help.

kelunik commented 2 years ago

Yes, there are multiple ways to do that. Which version do you plan to use (fiber / PHP 8.1 based AMPHP v3 or v2)? What's the source of your processes? Do you have everything in memory upfront or pull the tasks from somewhere?

jigarius commented 2 years ago

More context: jigarius/drall

jigarius commented 2 years ago

Yesternight I tried amphp/process and it seems to be able to run 50 commands with success.

The only thing I can't figure out is if I can control how many commands run in parallel, e.g. maximum 8 commands run in parallel at a time and the other commands wait till one of those 8 finishes.

If I can figure that out, I'll implement my feature using amphp/process. Any help will be appreciated. With that info, I will be able to submit a code example to this repo to help others facing similar issues.

Here's how the code looks with the work-in-progress.

kelunik commented 2 years ago

@jigarius Here's an example:

#!/usr/bin/env php
<?php
require \dirname(__DIR__).'/vendor/autoload.php';

use Amp\Loop;
use Amp\Process\Process;
use Amp\Sync\LocalSemaphore;
use function Amp\Iterator\fromIterable;

Loop::run(function () {
    $values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    yield Amp\Sync\ConcurrentIterator\each(fromIterable($values), new LocalSemaphore(3), function () {
        $process = new Process('sleep 1');
        yield $process->start();
        yield $process->join();
    });
});
jigarius commented 2 years ago

Thanks a lot! This seems to solve my issue.