amphp / parallel

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.
MIT License
779 stars 64 forks source link

Leftover ONE socket file (amp-parallel-ipc-*.sock) not cleaned up after execution in CLI mode #203

Closed medy36 closed 2 months ago

medy36 commented 2 months ago

Hi team,

I would like to reopen the issue regarding the zombie .sock files not being correctly deleted when using amphp/parallel.

BUG: ONE .socket file left after the clean, ONLY 1 is LEFT!

Background: I am running parallel tasks using amphp/parallel in two different environments:

After a recent update to the amphp/parallel library, I noticed an improvement where the .sock files were correctly deleted in the web version. However, the CLI version still leaves 1 .sock files in the /tmp folder for every execution, which eventually leads to issues (memory issue, ...etc).

Steps to Reproduce:

I suspect that the issue might be related to the way "LocalIpcHub" handles cleanup operations.

trowski commented 2 months ago

How is the CLI process being stopped? Is it being killed? I find it odd that the destructor is not being run.

medy36 commented 2 months ago

@trowski In Symfony framework, i am using Messenger component to run Async jobs, so the consumer is running on background (managed by Supervisor), each time a message is consumed, the script does the tasks (i am testing with your example below)

public function __invoke(RefreshTokenMessage $message): JsonResponse
{
        $this->logger->info('Start RefreshTokenMessage handler ...');

        try {
            // Initialize worker pool
            $workerFactory = new ContextWorkerFactory();
            $pool = new ContextWorkerPool(32, $workerFactory);
            workerPool($pool);

            $executions = [];
            $urls =  ['https://amphp.org', 'https://github.com'];

            foreach ($urls as $url) {
                $executions[$url] = submit(new FetchTask($url));
            }

            $responses = \Amp\Future\await(array_map(
                fn (Execution $e) => $e->getFuture(),
                $executions,
            ));

            foreach ($responses as $url => $response) {
                \printf("Read %d bytes from %s\n", \strlen($response), $url);
            }

        } catch (\Throwable $e) {
            $this->logger->error('An error occurred: ' . $e->getMessage());
        } finally {
            // Manually trigger garbage collection
            if (\gc_enabled()) {
                $this->logger->info('Running garbage collection...');
                \gc_collect_cycles();
            }
            $this->logger->info('Calling close on worker pool...');
            workerPool($pool)->shutdown();
            workerPool($pool)->kill();
            $pool->shutdown();
            $pool->kill();

            $this->logger->info('Finished RefreshTokenMessage handler');
        }
        return new JsonResponse('', Response::HTTP_NO_CONTENT);
    }

see that i was trying to trigger the destructor by any way. i have placed some Logs in LocalIpcHub at __destruct , unlink() and close(), none of them is triggered, the cleaning does not happen.

medy36 commented 2 months ago

@trowski here is a comparaison of CLI working and the one using a CRON , messenger async job for example:

This is an example running the script from CLI:
amphp-test.php

<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Future;
use Amp\Parallel\Worker;
use function Amp\async;

$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$executions = [];
foreach ($urls as $url) {
    // FetchTask is just an example, you'll have to implement
    // the Task interface for your task.
    $executions[$url] = Worker\submit(new \App\Messenger\MessageHandler\FetchTask($url));
}

// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
    fn (Worker\Execution $e) => $e->getFuture(),
    $executions,
));

foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

===> excution with debug log showing create/delete the amp-ipc-...socket OK

root@27e3ce21767c:/app/var# php amphp-test.php 
Create tmp file at /tmp/amp-parallel-ipc-87569c0c057448d2ea6b.sock
Read 81030 bytes from https://amphp.org
Read 244818 bytes from https://github.com
Read 44552 bytes from https://secure.php.net
   =============>   DESTRUCT  Context worker pool 
Calling destructor for  /tmp/amp-parallel-ipc-87569c0c057448d2ea6b.sock

excution with debug log showing create LOG the amp-ipc-...socket OK

but delete log KO ( not reached)

php bin/console mess:cons refresh-login -vv

 [OK] Consuming messages from transport "refresh-login".                                                                

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.       

 // Quit the worker with CONTROL-C.                                                                                     

14:22:12 INFO      [messenger] Received message App\Messenger\Message\RefreshTokenMessage ["class" => "App\Messenger\Message\RefreshTokenMessage"] ["pid" => 3285]
14:22:12 INFO      [app] Start RefreshTokenMessage handler ... ["pid" => 3285]
Create tmp file at /tmp/amp-parallel-ipc-8072cedbdeb59ec668f6.sock
Read 81030 bytes from https://amphp.org
Read 244844 bytes from https://github.com
14:22:13 INFO      [app] Running garbage collection... ["pid" => 3285]
14:22:13 INFO      [app] Calling close on worker pool... ["pid" => 3285]
14:22:13 INFO      [app] Finished RefreshTokenMessage handler ["pid" => 3285]
14:22:13 INFO      [messenger] Message App\Messenger\Message\RefreshTokenMessage handled by App\Messenger\MessageHandler\RefreshTokenHandler::__invoke ["class" => "App\Messenger\Message\RefreshTokenMessage","handler" => "App\Messenger\MessageHandler\RefreshTokenHandler::__invoke"] ["pid" => 3285]
14:22:13 INFO      [php] User Deprecated: Since symfony/doctrine-bridge 6.3: Registering "Gedmo\Timestampable\TimestampableListener" as a Doctrine subscriber is deprecated. Register it as a listener instead, using e.g. the #[AsDoctrineListener] attribute. ["exception" => ErrorException { …}] ["pid" => 3285]
14:22:13 INFO      [messenger] App\Messenger\Message\RefreshTokenMessage was handled successfully (acknowledging to transport). ["class" => "App\Messenger\Message\RefreshTokenMessage"] ["pid" => 3285]
trowski commented 2 months ago

So after stopping the Symfony worker, the socket file still remains?

medy36 commented 2 months ago

YES, When i stop the symfony worker, the socket are still remain.

I am still unable to trigger the destruct() when using the Symfony worker.

The worker should stay running to handle incoming messages queued. each Message is doing some parallel stuff. once in a while i restart the Symfony worker via the supervisor for other performances.

The messages are received via an endpoint and pushed to the worker:


public function process($data, Operation $operation, array $uriVariables = [], array $context = []): JsonResponse
    {
        $this->logger->info('Auth Refresh event');

        // triggers the "RefreshTokenHandler"
        $this->bus->dispatch(new RefreshTokenMessage($data));

        return new JsonResponse('', Response::HTTP_NO_CONTENT);
    }`
trowski commented 2 months ago

I'm not familiar with the Symfony Messenger component. It seems the process is being killed or is not allowing all destructors to run before shutting down for some reason.

It is unnecessary and unwise to be initializing a new worker pool and trying to shut it down in each handler. Simply use the global worker pool within the handler using Amp\Parallel\Worker\submit().

medy36 commented 2 months ago

I was using the global pool, but since i am facing this problem i was trying to trigger the shutdown manually.

trowski commented 2 months ago

Since you're setting the global pool instance, shutting down the pool will not trigger destructor of LocalIpcHub because there is still a reference to it in the pool instance, which is stored in a global static. This static instance will not be released until shutdown. For some reason, the process created by Symfony is not allowing that destructor to be executed.

Since this seems to be an issue outside of Amp, I'm closing this issue. Please reopen this issue or create a new one if you're able to discover why the destructor is not being run and the issue is in this library.

medy36 commented 2 months ago

Ok Thanks.

I will try to find out the reason to not reaching the destructor.

Maybe opening an issue in Symfony side.