mgdigital / BusQue

Command Queue and Scheduler for PHP7 and Redis
MIT License
117 stars 1 forks source link

Dealing with spinning up/down workers #6

Open rquast opened 8 years ago

rquast commented 8 years ago

Great project, thank you!

I've taken a brief look through the project, and it looks pretty similar to a scheduler that I've used in the past (Quartz Scheduler), which I was going to use until I found out about your project.

Most things look pretty straight forward, but I am a bit concerned about jobs (commands) being stopped in the middle of their process. As far as I can tell, the only way to deal with this at the moment is to implement some kind of signal handler for the command? It would be better if that was handled by BusQue.

I was thinking that it would be easier, in the long term, to implement some things similar to Quartz. The first is a standardised way to interrupt a command (http://stackoverflow.com/questions/7159080/how-to-interrupt-or-stop-currently-running-quartz-job). The second is a single process that spins up/down worker threads. Currently, if you have 20 workers, you have to manually up/down them. Would it be easier to have a single process that deals with this? It would make things a lot easier from a configuration perspective.

Also, has anyone used this in production yet? Anyone had any issues with serialization between different servers, etc?

Thanks once again for the great code! :)

mgdigital commented 8 years ago

Thank you! I have not used this in production yet - I intend to start in about 2 months and will be making further updates between now and then, including compatibility breaks - and I wouldn't say this has been "battle hardened" yet so I would proceed with caution just at the moment!

Job interruption could possibly be handled better, but is slightly tricky if the signal is received while 3rd party code is executing. I have considered handling signals in some way but I'm not sure if this could be made reliable.

The worker can (and probably should) be passed a finite length of time/number of commands to process before it stops - my strategy is to use something like supervisord as a process manager to manage these and restart them automatically.

I haven't seen any serialization issues but would suggest using simple values/objects for the command payload which can be predictably (un)serialized, and any problem like this could be overcome by using a custom serializer.

rquast commented 8 years ago

Thanks for the update on this. It looks really promising. I probably won't use it for the current project I'm working on, but look forward to using it in others in the future. Like supervisord, I have used daemontools in the past for similar processes and it worked well for PHP stuff. In Java you generally don't have to worry about the worker issue because you just create a new thread and it's pretty easy to handle all errors. I know it's a lot more difficult to manage with PHP (at least from my past experience with things like pcntl_fork).

On a side note, yesterday, I started creating my own scheduler in reactphp that uses redis pubsub (for adding/removing jobs) and loop timers to fire pending jobs (if they are due - using cron expression). The jobs take in json from the redis pubsub, and when they are due, they fire an async http request (using GuzzleHttp) to a uri specified in the json to run the job. I need to think about how to set the status of pending jobs, failed jobs, retry etc. It should help out with running jobs on different servers and it's also language agnostic. Not sure how I'll go, but there aren't really any good schedulers like the one you've written for PHP, so I figured what the heck. Here is the basic outline just to test the idea..

<?php

require __DIR__ . '/../../vendor/autoload.php';

use Cavy\Lib\HelperFunctions;
use Clue\React\Redis\Factory;

HelperFunctions::checkEnvironment();
HelperFunctions::setDefaultTimezone();

$logger = new \Monolog\Logger('cron_log');
$file_handler = new \Monolog\Handler\StreamHandler("/logs/cron.log");
$logger->pushHandler($file_handler);

try {

    $loop = React\EventLoop\Factory::create();

    // TODO: all unfinished tasks should be read out of the database/queue persistence.
    $tasks = [];

    $factory = new Factory($loop);
    $factory->createClient('localhost/' . \Cavy\Lib\RedisFunctions::PUBSUB_DATABASE)->then(function ($client) use (&$tasks) {

        $client->psubscribe('*:cron');

        $client->on('pmessage', function ($pattern, $topic, $payload) {
            $task = json_decode($payload, true);
            // TODO: save the task to the database first.
            echo 'TOPIC: ' . $topic . "\n";
            print_r($task);
            $tasks[]  = $task;
        });

    })->then(null, function ($e) {
        echo 'ERROR: Unable to subscribe to Redis channel: ' . $e;
    });

    $interval = 1;

    $httpClient = new \GuzzleHttp\Client();

    $loop->addPeriodicTimer($interval, function () use ($interval, $loop, &$tasks, $httpClient) {
        $loop->addTimer($interval - time() % $interval, function () use (&$tasks, $httpClient) {

            echo 'Cron Loop Check: ' . time() . "\n";

            foreach ($tasks as $task) {

                if (!isset($task['cron_expression'])) {
                    continue;
                }

                $cronExpression = \Cron\CronExpression::factory($task['cron_expression']);

                if ($cronExpression->isDue()) {

                    // TODO: async http request.
                    // TODO: should look up task to figure out if it is get/post/put/patch/delete
                    $request = new \GuzzleHttp\Psr7\Request('GET', $task['uri']);
                    $promise = $httpClient->sendAsync($request)->then(function (\GuzzleHttp\Psr7\Response $response) {
                        $code = $response->getStatusCode();
                        // TODO: deal with the response code.
                    });

                }
            }
        });
    });

    $loop->run();

} catch (Exception $ex) {

    HelperFunctions::printStackTrace($ex);

    sleep(10);

    // Any uncaught exceptions are fatal.
    exit;

}
mgdigital commented 8 years ago

It looks like you have the luxury of coming from a proper programming language, so welcome to PHP ;)

I already have some cron scheduler code locally which works similarly to what you have here - not pushed here yet as it isn't sufficiently tested. The main difference with mine is that you'd pass it a command factory instead of setting up the React loop - the factory would instantiate the command in whichever way it likes and then place it in the queue, where it's handled by the queue worker. Also various ways of providing the Cron tasks are needed, as some may be in config files while some may be persisted in the application, e.g. in Redis.

With regards to checking command statuses, this is difficult as a command with the same ID may be processed multiple times, but I do intend to add a "dead letter queue" to keep track of failed commands.

Sorry this isn't quite ready for your project yet - the main changes I need to make before considering this "ready" is completing the cron scheduler, improving the event system and error handling, making the Behat suite more comprehensive, and achieving HHVM compatibility. I've also removed the MGDigital\ namespace in my local code to reduce verbosity, hence the BC break. Hope you can find a use for it in future!

rquast commented 8 years ago

I think I know what you mean by that queue (I saw that code you wrote in the RedisAdapter). Using brpoplpush etc means you can just keep polling the queue and handle concurrency. I might try use the same thing.. it's a nifty way to deal with the loop and you don't need to use pubsub. Only thing I can think is that redis would require append only mode to be turned on so it's fault tolerant in the case of a forced reboot/restart.