amphp / parallel-functions

Simplified parallel processing for PHP based on Amp.
https://amphp.org/parallel-functions
MIT License
268 stars 18 forks source link

How can I accelerate massive data collection using parallel-functions #20

Closed bergmab closed 4 years ago

bergmab commented 4 years ago

I started to use parallel-functions in some cli scripts for collecting data in network devices via SNMP. When there's just a few thousands devices on which to collect data that works very well. But there's situation where I need to collect data on more than 200K devices. I do have a Java script that can do that in less than 30 minutes. Using parallel-functions so far I can't do better than 2 hours. Of course when I increase the Pool DEFAULT_MAX_SIZE it's faster but always ends in error like this:

^ Amp\MultiReasonException^ {#1024465
  -reasons: array:240 [
    "f8eda5b75ce2" => Amp\Parallel\Context\ContextException^ {#1037279
      #message: "Starting the process failed"
      #code: 0
      #file: "./vendor/amphp/parallel/lib/Context/Process.php"
      #line: 202
      -previous: Amp\Parallel\Context\ContextException^ {#1039155
        #message: "Starting the process timed out"
        #code: 0
        #file: "./vendor/amphp/parallel/lib/Context/Internal/ProcessHub.php"
        #line: 127
        -previous: Amp\TimeoutException^ {#1037540 …5}
        trace: { …35}
      }

So I was wondering if there's way to improve it to execute more than 32 parallel processes simultaneously? Does parallel-functions is the right package for my needs?

Note: I am using PHP 7.4 on a Centos 7 server and here's an example script:

use SNMP;
use Amp\Promise;
use function Amp\ParallelFunctions\parallelMap;

$hosts = [
    'host1'=>['ip'=>'1.1.1.1'], 
    'host2'=>['ip'=>'2.2.2.2'], 
    'host3'=>['ip'=>'3.3.3.3'],
];

$values = Promise\wait(parallelMap($hosts, function ($host) {
    $session = new SNMP(SNMP::VERSION_2c, $host['ip'], "community");
    $host['snmpMib2System'] = $session->walk("1.3.6.1.2.1.1");
    return $host;
}));

// Do some stuff on $values and store it to DB
kelunik commented 4 years ago

Do you have error reporting enabled? Does PHP emit any notices / warnings?

bergmab commented 4 years ago

Error reporting is enabled but PHP does not emit any notices / warnings. I use SNMPException to catch SNMP related "errors" like "No response from host", etc. Here's a more accurate example of what I use:

        $values = Promise\wait(parallelMap($hosts, function ($host) {
            $session = new SNMP(SNMP::VERSION_2c, $host['ip'], "community", 1000000, 1);
            $session->exceptions_enabled = SNMP::ERRNO_ANY;
            try {
                $host['snmpMib2System'] = $session->walk("1.3.6.1.2.1.1");
            } catch (SNMPException $e) {
                $host['error'] = $e->getMessage();
            } finally {
                $session->close();
            }
            return $host;
        }));
kelunik commented 4 years ago

I'm asking, because if I modify the example and run it with a high number of processes, I get errors like below (running with arguments of 128 128).

<?php

require __DIR__ . '/../vendor/autoload.php';

use function Amp\ParallelFunctions\parallelMap;
use function Amp\Promise\wait;

$pool = new Amp\Parallel\Worker\DefaultPool((int) $argv[1]);

$array = \array_fill(0, (int) $argv[2], 1);

// Parallel function execution is nice, but it's even better being able to use closures instead of having to write a
// function that has to be autoloadable.

try {
    \var_dump(wait(parallelMap($array, function ($time) {
        \sleep($time); // a blocking function call, might also do blocking I/O here

        return $time * $time;
    }, $pool)));
} catch (Amp\MultiReasonException $e) {
    throw reset($e->getReasons());
}
PHP Warning:  include(/home/kelunik/GitHub/amphp/parallel-functions/vendor/amphp/process/lib/ProcessException.php): failed to open stream: Too many open files in /home/kelunik/GitHub/amphp/parallel-functions/vendor/composer/ClassLoader.php on line 444
...
PHP Fatal error:  Uncaught Error: Class 'Amp\Failure' not found in /home/kelunik/GitHub/amphp/parallel-functions/vendor/amphp/amp/lib/Internal/Placeholder.php:177
bergmab commented 4 years ago

I reworked my code and now I am getting similar errors as you when using 128 as pool maxSize:

use Amp\MultiReasonException;
use Amp\Parallel\Worker\DefaultPool;
use Amp\Promise;
use SNMP;
use SNMPException;
use function Amp\ParallelFunctions\parallelMap;

error_reporting(E_ALL);

$hosts = [
    'host1'=>['ip'=>'1.1.1.1'],
    'host2'=>['ip'=>'2.2.2.2'],
    'host3'=>['ip'=>'3.3.3.3'],
];

try {
    $pool = new DefaultPool(128);
    $values = Promise\wait(parallelMap($hosts, function ($host) {
        $session = new SNMP(SNMP::VERSION_2c, $host['ip'], "community", 1000000, 1);
        $session->exceptions_enabled = SNMP::ERRNO_ANY;
        try {
            $host['snmpMib2System'] = $session->walk("1.3.6.1.2.1.1");
        } catch (SNMPException $e) {
            $host['error'] = $e->getMessage();
        } finally {
            $session->close();
        }
        return $host;
    }, $pool));
} catch (MultiReasonException $e){
    print_r($e->getReasons());
}

// Do some stuff on $values and store it to DB

Here's the error I got: vendor/amphp/amp/lib/Failure.php): failed to open stream: Too many open files

So I looked at the open files limit on Centos using ulimit -n and it showed 1024. I increased it to 4096 and runned the script again. Now I am getting a different error:

    [f8eda5a3e5b2] => Amp\Parallel\Context\ContextException Object
        (
            [message:protected] => Starting the process failed
            [string:Exception:private] => 
            [code:protected] => 0
            [file:protected] => /vendor/amphp/parallel/lib/Context/Process.php
            [line:protected] => 202

            [previous:Exception:private] => Amp\Parallel\Context\ContextException Object
                (
                    [message:protected] => Starting the process timed out
                    [string:Exception:private] => 
                    [code:protected] => 0
                    [file:protected] => /vendor/amphp/parallel/lib/Context/Internal/ProcessHub.php
                    [line:protected] => 127

                    [previous:Exception:private] => Amp\TimeoutException Object
                        (
                            [message:protected] => Operation timed out
                            [string:Exception:private] => 
                            [code:protected] => 0
                            [file:protected] => /vendor/amphp/amp/lib/functions.php
                            [line:protected] => 253

I run the script a couple more times and I also got this error: Loop exceptionally stopped without resolving the promise In the stack trace I also saw this one: Call to undefined function posix_kill() at /vendor/amphp/process/lib/Internal/Posix/Runner.php:193

bergmab commented 4 years ago

Another thing I noticed, the script is getting slower as it's progressing. Not sure how it should be handled. Do you think it should be handled as a bug?

bergmab commented 4 years ago

About the Call to undefined function posix_kill(), this fixed the issue: sudo yum install php74-php-process.x86_64

Now the only issue remaining is the "process timed out" when trying to use more than 32 processes.

kelunik commented 4 years ago

@bergmab See https://github.com/amphp/parallel/issues/112 for that.