amphp / process

An async process dispatcher for Amp.
MIT License
229 stars 32 forks source link

Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher) #74

Closed grisha2217 closed 1 month ago

grisha2217 commented 1 month ago

PHP Version 8.3.11

Error: market_platform_error_0 Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):
 #0 /vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:100 Fiber::suspend()
 #1 /vendor/amphp/amp/src/Future.php:251 Revolt\EventLoop\Internal\DriverSuspension->suspend()
 #2 /telegram/Telegram.php:438 Amp\Future->await() 
 #3 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:597 LZT\Telegram\Telegram->LZT\Telegram\{closure}() 
 #4 Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()

  #0 /vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:100 Fiber::suspend() 
  #1 /vendor/amphp/amp/src/Future.php:251 Revolt\EventLoop\Internal\DriverSuspension->suspend() 
  #2 /telegram/Client.php:59 Amp\Future->await() 
  #3 /vendor/amphp/amp/src/functions.php:33 LZT\Telegram\Client->LZT\Telegram\{closure}() 
  #4 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:425 Amp\{closure}() 
  #5 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:562 Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks()
   #6 Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}() 
   #0 /vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:100 Fiber::suspend() 
   #1 /vendor/amphp/amp/src/Future.php:251 Revolt\EventLoop\Internal\DriverSuspension->suspend() 
   #2 /telegram/Client.php:220 Amp\Future->await()
    #3 /telegram/Client.php:191 LZT\Telegram\Client->requeue() 
    #4 /telegram/Client.php:91 LZT\Telegram\Client->handle() 
    #5 /vendor/amphp/amp/src/functions.php:33 LZT\Telegram\Client->LZT\Telegram\{closure}() 
    #6 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:425 Amp\{closure}() 
    #7 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:562 Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks() 
    #8 Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}() 
    #0 /vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:100 Fiber::suspend() 
    #1 /vendor/amphp/amp/src/Future.php:251 Revolt\EventLoop\Internal\DriverSuspension->suspend() 
    #2 /telegram/Telegram.php:66 Amp\Future->await() 
    #3 /vendor/amphp/amp/src/functions.php:33 LZT\Telegram\Telegram->LZT\Telegram\{closure}() 
    #4 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:425 Amp\{closure}() 
    #5 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:562 Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks() 
    #6 Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()
     #0 /vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:100 Fiber::suspend()
      #1 /vendor/amphp/amp/src/Future.php:251 Revolt\EventLoop\Internal\DriverSuspension->suspend() 
      #2 /telegram/Telegram.php:365 Amp\Future->await() 
      #3 /telegram/Telegram.php:438 LZT\Telegram\Telegram->LZT\Telegram\{closure}() 
      #4 /vendor/amphp/amp/src/functions.php:33 LZT\Telegram\Telegram->LZT\Telegram\{closure}() 
      #5 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:425 Amp\{closure}() 
      #6 /vendor/revolt/event-loop/src/EventLoop/Internal/AbstractDriver.php:616 Revolt\EventLoop\Internal\AbstractDriver->invokeMicrotasks() 
      #7 Revolt\EventLoop\Internal\AbstractDriver->Revolt\EventLoop\Internal\{closure}()

library/vendor/revolt/event-loop/src/EventLoop/Internal/DriverSuspension.php:145
public function wrapper(string $authKey, int $dcID, array $proxy, callable $run, array &$options = []): array
    {
        $error = '';
        $info = [];

        $suspension = EventLoop::getSuspension();

        $defer = EventLoop::defer(function () use ($authKey, $dcID, &$error, &$info, $proxy, $run, &$options, $suspension) {
            $payload = [
                'timeout' => $options['timeout'] ?? 60,
                'lastDC'  => $dcID,
                'session' => [
                    'datacenter' => [
                        $dcID => [
                            'nonce'      => null,
                            'newNonce'   => null,
                            'timeOffset' => 0,
                            'serverSalt' => null,
                            'authKeyId'  => null,
                            'authKey'    => gmp_import(hex2bin($authKey), 1, GMP_MSW_FIRST | GMP_BIG_ENDIAN),
                            'sessionId'  => null,
                        ],
                    ],
                ],
            ];

            $logger = new Logger('telegram');
            $temp = tmpfile();
            register_shutdown_function(function () use ($temp) {
                if (file_exists(stream_get_meta_data($temp)['uri'])) {
                    unlink(stream_get_meta_data($temp)['uri']);
                }
            });
            $path = stream_get_meta_data($temp)['uri'];
            $logger->pushHandler(new StreamHandler($path, Level::Debug));
            XenForo_Application::set('telegramDebugFile', $path);
            XenForo_Application::set('telegramLogger', $logger);

            try {
                $storage = new Memory(serialize($payload));
                $client = new Client(
                    $storage, $logger, clientConfig: $options['client'],
                    datacenter: Datacenter::byId($dcID), proxy: $proxy
                );
                $state = $client->{'updates.getState'}->await();
                if ($state->is('RpcError')) {
                    $error = $state->error_message;
                } else {
                    [$info, $error] = async(fn () => $run($info, $error, $client))->await();
                }
            } catch (Exception $e) {
                $error = $e;
            }

            $suspension->resume();
        });

        EventLoop::setErrorHandler(function ($errorObject) use (&$error, $suspension, $defer) {
            if ($errorObject->getPrevious()) {
                $errorObject = $errorObject->getPrevious();
            }

            if (str_ends_with($errorObject->getFile(), 'library/Market/Model/Platform/Checkers/telegram/Telegram.php') ||
                str_ends_with(
                    $errorObject->getFile(),
                    'library/Market/Model/Platform/Checkers/telegram/Protocol/Message.php'
                ) || str_ends_with($errorObject->getFile(), 'library/Market/Model/Platform/Checkers/telegram/Client.php')) {
                $error = $errorObject; // Return full error traceback, if error in checker code
            } else {
                $error = $errorObject->getMessage();
            }

            EventLoop::cancel($defer);
            EventLoop::setErrorHandler(null);
            $suspension->resume();
        });

        set_error_handler(
        /**
         * @throws \Market\Model\Platform\Exceptions\RetryException
         */
        function (int $errno, string $errstr, string $errfile, int $errline) {
            throw new RetryException($errstr);
        }, E_ALL);
        $suspension->suspend();
        restore_error_handler();
        EventLoop::cancel($defer);
        EventLoop::setErrorHandler(null);
        return [$info, $error];
    }