schmittjoh / JMSJobQueueBundle

Run and Schedule Symfony Console Commands as Background Jobs
http://jmsyst.com/bundles/JMSJobQueueBundle
334 stars 254 forks source link

Future of this bundle? #208

Closed stephanvierkant closed 5 years ago

stephanvierkant commented 6 years ago

I'd like to share my thoughts about the future of this bundle:

I think we should decide if this bundle still serves a purpose in a Symfony 4.1+ environment. Is the Message Component (with php-enqueue) a real alternative?

schmittjoh commented 6 years ago

I'm not familiar with php-enqueue, maybe you could tell us some more about the feature set in particular since you seem to have researched that already:

As for my current viewpoint. We actively use this bundle on Scrutinizer and as such I (or someone at Scrutinizer) will continue to maintain the bundle unless we switch to another solution (f.e. php-enqueue). In terms of scalability, I've had a look at our monitoring and we process about 250k jobs/day across multiple servers (> 10 which handle this kind of workload). However, I think this is not designed for high-throughput situations, the main advantage is visibility into job execution and scheduling jobs. It also does not require an additional messaging infrastructure if you do not have this already.

On Scrutinizer, we also have a separate system, we call Workflow. This is for example responsible for tracking execution state of inspections, scheduling different tasks of inspections, dispatching events, tracking progress of tasks, managing task heartbeats, and some more features. This kind of distributed system uses a messaging system as many processes need to communicate with each other and a large number of listeners can subscribe to events dispatched along the way. For this kind of system, this bundle would not be the right fit.

In the end, I would need to know more about php-enqueue, but in general it seems more about transport abstraction and the Messenger component about process communication whereas this bundle seems to be on a higher level (Job execution).

stephanvierkant commented 6 years ago

Thanks for your reply. Must be said: thanks for publishing and maintaining this bundle in the first place :).

In the end, I would need to know more about php-enqueue, but in general it seems more about transport abstraction and the Messenger component about process communication whereas this bundle seems to be on a higher level (Job execution).

You are right, these libraries are not completely interchangeable. The scalability is a concern in my use case, so I'm looking for a way of offloading background jobs to a dedicated job server. php-enqueue seems to provide this feature. This bundles lacks a feature like that, or am I wrong? Can you explain how your >10 server setup looks like?

As for Job execution (schedule, retry, logging, etc.) php-enqueue isn't ready for it, but I think this is something that will be developed soon.

We actively use this bundle on Scrutinizer and as such I (or someone at Scrutinizer) will continue to maintain the bundle unless we switch to another solution.

Can you explain what 'maintain' means? Some of the Pull Requests are open for many years, up to 4 years. If you need any help, please let us know.

This project isn't abandoned and it's still working fine, but both concerns (scalability and number of open PR's) combined made me think of migrating to an other solution.

schmittjoh commented 6 years ago

Since this bundle provides just one transport, the database, scalability depends on how well you can scale your database. If you just want to transition one worker process from one server to another, it does not change the amount of queries your database has to run, and as such you should not need to worry too much as long as both servers can access your database.

If we are talking hundreds or thousands of servers, then yes, you may need to use a database cluster or a dedicated database architecture just for the jobs tables, but it doesn't sound like we are talking about that kind of scalability yet. You probably also want to create very specific solutions for your kind of workload at that point.

As for maintaining, it means we will look at things we find interesting (and hopefully therefore others too) when we need something. Or if something serious comes up like a security issue or that this bundle could not be installed anymore for some reason, then we would look at that, too.

I can also add more maintainers to the bundle, but we generally prefer stability and robustness over features at this point. That said, there are some good pull-requests in particular about improving the UI which could be reviewed, tested and merged. For others, I think providing an extension point is better to allow implementing a feature in user-land or as part of another bundle. Also, there are issues which have questions about how to use this bundle which we generally do not have time to answer.

If you have some general ideas where this bundle should be improved further (and want to invest time to implement those), I would be happy to talk about that also, but scalability in particular is a question of your database architecture.

temp commented 6 years ago

Stability is fine, but having no progress at all is frustrating. For example my nearly 4-year-old pull request #70 which removes the need to change the Symfony Application, but raises the minimum Symfony version to 2.3. Should be fine by now ;-) Having to run your own fork just to use the library (which many people do, reading the comments in the PRs) is quite contra-productive.

noofaq commented 6 years ago

@temp Because of same reasons I have already ported my applications to use php-enqueue (adding my very simple scheduler for CLI commands as this feature was missing there).

To port my Symfony3 application to Symfony4 I had to completely abandon two @schmittjoh bundles: JMSJobQueueBundle (replaced by php-enqueue) and also JMSTranslation (replaced by php-translation) as it was impossible to keep my own forks and manually merge pull requests coming from various contributors. Now everything is much simplier, required some unexpected work to migrate though.

modstore commented 6 years ago

(adding my very simple scheduler for CLI commands as this feature was missing there)

@noofaq I'm looking to do the same thing asap, did you write your own or find something existing? Also, was there anything that doesn't work as well now? Were you able to run async commands? Thanks in advance if you get a chance to answer my questions! Cheers.

noofaq commented 6 years ago

@modstore I have implemented my own very basic scheduler (reasons: time constraint + not that complex needs)

Basic entity which is used for configuration:

<?php

namespace App\Entity;

//annotations
use Doctrine\ORM\Mapping as ORM;

/**
 * ScheduledCommand
 *
 * @ORM\Table(name="scheduled_command")
 * @ORM\Entity(repositoryClass="App\Entity\Repository\ScheduledCommandRepository")
 */
class ScheduledCommand
{
    //... Fields ...
    /**
     * @ORM\Column(type="string")
     * @ORM\Id
     *
     * @var string
     */
    private $id;

    /**
     * @ORM\Column(type="string")
     *
     * @var string
     */
    private $command;

    /**
     * @ORM\Column(type="json", nullable=true)
     *
     * @var array
     */
    private $options;

    /**
     * @ORM\Column(type="integer")
     *
     * @var integer
     */
    private $frequency;

    /**
     * @ORM\Column(type="datetime", nullable=true)
     *
     * @var \DateTime|null
     */
    private $lastTriggeredAt;

    /**
     * @ORM\Column(type="datetime", nullable=true)
     *
     * @var \DateTime|null
     */
    private $lastRunAt;

    /**
     * @ORM\Column(type="text", nullable=true)
     *
     * @var string
     */
    private $lastResult;

    /**
     * @return string
     */
    public function getId(): string
    {
        return $this->id;
    }

    /**
     * @param string $id
     * @return ScheduledCommand
     */
    public function setId(string $id): ScheduledCommand
    {
        $this->id = $id;
        return $this;
    }

    /**
     * @return string
     */
    public function getCommand(): string
    {
        return $this->command;
    }

    /**
     * @param string $command
     * @return ScheduledCommand
     */
    public function setCommand(string $command): ScheduledCommand
    {
        $this->command = $command;
        return $this;
    }

    /**
     * @return array
     */
    public function getOptions(): ?array
    {
        return $this->options;
    }

    /**
     * @param array $options
     * @return ScheduledCommand
     */
    public function setOptions(?array $options): ScheduledCommand
    {
        $this->options = $options;
        return $this;
    }

    /**
     * @return int
     */
    public function getFrequency(): int
    {
        return $this->frequency;
    }

    /**
     * @param int $frequency
     * @return ScheduledCommand
     */
    public function setFrequency(int $frequency): ScheduledCommand
    {
        $this->frequency = $frequency;
        return $this;
    }

    /**
     * @return string
     */
    public function getLastResult(): string
    {
        return $this->lastResult;
    }

    /**
     * @param string $lastResult
     * @return ScheduledCommand
     */
    public function setLastResult(string $lastResult): ScheduledCommand
    {
        $this->lastResult = $lastResult;
        return $this;
    }

    /**
     * @return \DateTime|null
     */
    public function getLastTriggeredAt(): ?\DateTime
    {
        return $this->lastTriggeredAt;
    }

    /**
     * @param \DateTime|null $lastTriggeredAt
     * @return ScheduledCommand
     */
    public function setLastTriggeredAt(?\DateTime $lastTriggeredAt): ScheduledCommand
    {
        $this->lastTriggeredAt = $lastTriggeredAt;
        return $this;
    }

    /**
     * @return \DateTime|null
     */
    public function getLastRunAt(): ?\DateTime
    {
        return $this->lastRunAt;
    }

    /**
     * @param \DateTime|null $lastRunAt
     * @return ScheduledCommand
     */
    public function setLastRunAt(?\DateTime $lastRunAt): ScheduledCommand
    {
        $this->lastRunAt = $lastRunAt;
        return $this;
    }

}

Scheduler command

<?php

namespace App\Command;

use App\Entity\ScheduledCommand;
use App\Queue\Processor\RunCommandProcessor;
use Doctrine\ORM\EntityManager;
use Doctrine\ORM\EntityManagerInterface;
use Enqueue\Client\ProducerInterface;
use Mtdowling\Supervisor\EventListener;
use Mtdowling\Supervisor\EventNotification;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class EventSchedulerCommand extends ContainerAwareCommand
{
    protected static $defaultName = 'app:event:scheduler';
    /**
     * @var ProducerInterface
     */
    private $enqueueProducer;
    /**
     * @var EntityManagerInterface|EntityManager
     */
    private $entityManager;

    /**
     * EventSchedulerCommand constructor.
     *
     * @param ProducerInterface $enqueueProducer
     * @param EntityManagerInterface $entityManager
     */
    public function __construct(ProducerInterface $enqueueProducer, EntityManagerInterface $entityManager)
    {
        parent::__construct(self::$defaultName);

        $this->enqueueProducer = $enqueueProducer;
        $this->entityManager = $entityManager;
    }

    /**
     * {@inheritdoc}
     */
    protected function configure()
    {
        $this
            ->setName(self::$defaultName)
            ->setDescription('Schedules commands to be executed using Enqueue events')
        ;
    }

    /**
     * {@inheritdoc}
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {

        $listener = new EventListener();
        $listener->listen(function(EventListener $listener, EventNotification $event) {

            $env = $this->getContainer()->getParameter('kernel.environment');
            foreach ($this->entityManager->getRepository(ScheduledCommand::class)->findAll() as $command) {

                if ($command->getLastTriggeredAt()
                    && $command->getLastTriggeredAt()->getTimestamp() + $command->getFrequency() > time() ) {
                    //command does not need to be triggered (for now)
                    continue;
                }

                $this->enqueueProducer->sendCommand(RunCommandProcessor::COMMAND_NAME, [
                    'id' => $command->getId(),
                    'env' => $env,
                ]);
                $listener->log('triggering '.$command->getId().' '.((new \DateTime())->format('Y-m-d H:i:s')));

                $command->setLastTriggeredAt(new \DateTime());
                $this->entityManager->flush();
            }

            return true;
        });

    }

}

Command processor (very similar code already mentioned in one of Github issues)

<?php

namespace App\Queue\Processor;

use App\Entity\ScheduledCommand;
use Doctrine\ORM\EntityManagerInterface;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Symfony\Component\Process\Exception\ProcessFailedException;
use Symfony\Component\Process\Process;

/**
 * Class RunCommandProcessor
 *
 * @see https://github.com/php-enqueue/enqueue-dev/issues/213
 */
class RunCommandProcessor implements PsrProcessor, CommandSubscriberInterface
{
    const COMMAND_NAME = 'run_command';

    /** @var string */
    private $projectDir;

    /** @var string */
    private $env;
    /**
     * @var EntityManagerInterface
     */
    private $entityManager;

    /**
     * RunCommandProcessor constructor.
     * @param string $rootDir
     * @param string $env
     * @param EntityManagerInterface $entityManager
     */
    public function __construct(string $rootDir, string $env, EntityManagerInterface $entityManager)
    {
        $this->projectDir = $rootDir;
        $this->env = $env;
        $this->entityManager = $entityManager;
    }

    /**
     * @param PsrMessage $message
     * @param PsrContext $context
     * @return Result|object|string
     */
    public function process(PsrMessage $message, PsrContext $context)
    {
        $body = JSON::decode($message->getBody());

        $command = $this->entityManager->getRepository(ScheduledCommand::class)->findOneById($body['id']);

        if (!$command) {
            return Result::reject(sprintf('Command with ID: "%s" has not been found in the DB!', $body['id']));
        }

        $options = '';
        if ($command->getOptions()) {
            foreach ($command->getOptions() as $k => $v) {
                //env cannot be passed here
                if ('env' === $k) {
                    continue;
                }
                $options = ' --'.$k.'='.$v.' ';
            }
        }

        $command->setLastResult('<started> '.((new \DateTime())->format('Y-m-d H:i:s')));
        $this->entityManager->flush();

        $process = new Process('php ./bin/console '.$command->getCommand().' '.$options.' --env='.($body['env'] ?? $this->env), $this->projectDir);
        try {
            $process->mustRun();

            $command->setLastResult('<finished> '.$process->getOutput().PHP_EOL.PHP_EOL.$process->getErrorOutput());
            $command->setLastRunAt(new \DateTime());
            $this->entityManager->flush();

            return Result::ACK;
        } catch (ProcessFailedException $e) {

            $message = sprintf('<error> The process failed with exception: "%s" in %s at %s', $e->getMessage(), $e->getFile(), $e->getLine());
            $command->setLastResult($message);
            $this->entityManager->flush();

            return Result::reject($message);
        }
    }

    public static function getSubscribedCommand()
    {
        return [
            'processorName' => self::COMMAND_NAME,
            'queueName' => self::COMMAND_NAME,
            'queueNameHardcoded' => true,
//            'exclusive' => true,
        ];
    }
}

Scheduler is kicked by supervisor using "TICK" feature:

[eventlistener:event_scheduler]
command=php __ROOTDIR__/bin/console app:event:scheduler --env=prod
process_name=%(program_name)s_%(process_num)02d
numprocs=1
directory=/tmp
autostart=true
autorestart=true
startsecs=1
startretries=999
events=PROCESS_STATE_STARTING,TICK_60
user=__USER__
redirect_stderr=false
stdout_logfile=__LOGDIR__/%(program_name)s.out.log
stderr_logfile=__LOGDIR__/%(program_name)s.error.log
stopwaitsecs=60

Rest is done by standard job queue runner controlled by Supervisor. The command I have developed is used to just register jobs to be done. I haven't used AsyncCommands though so cannot help with that unfortunately.

Solution seems to work just fine so far - I might switch to better solution one day but no need for that right now. If you have any questions don't hesitate to ask me. [Sorry for lengthy post]

modstore commented 6 years ago

@noofaq Thanks heaps for the details, that looks good. With 4.1 coming out today, I'll hopefully start looking at getting it implemented next week. Cheers.

schmittjoh commented 5 years ago

Closing this one here as there is nothing specific to do.