Webador / SlmQueue

Laminas / Mezzio module that integrates with various queue management systems.
Other
137 stars 56 forks source link

Limit WorkerEvent::JOB_STATUS_FAILURE_RECOVERABLE jobs #160

Closed kusmierz closed 6 years ago

kusmierz commented 9 years ago

As far as I understand, throwing an exception in a job (any \Exception for Beanstalk, Redis and SQS or \SlmQueueDoctrine\Job\Exception\ReleasableException for Doctrine, which btw is different than the rest) will reinsert the job (automatically) for another try. It's good, BUT. What if I have some error(s) in my job's content itself? It would reinsert for infinity...

To resolve it, I propose MaxRecoverableFailuresStrategy.

<?php

namespace VaryaBase\SlmQueue\Strategy;

use SlmQueue\Strategy\AbstractStrategy;
use SlmQueue\Worker\WorkerEvent;
use Zend\EventManager\EventManagerInterface;

/**
 * Class MaxRecoverableFailuresStrategy
 * @package VaryaBase\SlmQueue\Strategy\MaxRecoverableFailuresStrategy
 */
class MaxRecoverableFailuresStrategy extends AbstractStrategy
{
    /**
     * @var int
     */
    protected $maxFailures = 10;

    /**
     * @param int $maxFailures
     */
    public function setMaxFailures($maxFailures)
    {
        $this->maxFailures = $maxFailures > 0 ? intval($maxFailures) : 0;
    }

    /**
     * @return int
     */
    public function getMaxFailures()
    {
        return $this->maxFailures;
    }

    /**
     * {@inheritDoc}
     */
    public function attach(EventManagerInterface $events, $priority = 1)
    {
        $this->listeners[] = $events->attach(
            WorkerEvent::EVENT_PROCESS_JOB,
            array($this, 'onStopConditionCheck'),
            -1000
        );
    }

    /**
     * @param WorkerEvent $event
     */
    public function onStopConditionCheck(WorkerEvent $event)
    {
        $result = $event->getResult();
        $job = $event->getJob();
        $queue = $event->getQueue();
        $metadataKey = '__failure_recoverable_count';

        if ($this->getMaxFailures() && WorkerEvent::JOB_STATUS_FAILURE_RECOVERABLE === $result) {
            $failureCount = intval($job->getMetadata($metadataKey));
            ++$failureCount;

            $queue->delete($job);

            if ($failureCount >= $this->getMaxFailures()) {

                $this->state = sprintf(
                    'Job #%s (%s) failed executing for %d times, deleting...',
                    $job->getId(), get_class($job), $failureCount
                );
            } else {
                $this->state = sprintf(
                    'Job #%s (%s) failed executing, but has been added to the queue again (%d/%d)',
                    $job->getId(), get_class($job), $failureCount, $this->getMaxFailures()
                );

                $job->setMetadata($metadataKey, $failureCount);

                $delay = $this->factorial(round(log($failureCount, 2))) * 30;

                $queue->push(clone $job, ['delay' => $delay]);
            }
        }
    }

    /**
     * Calculates factorial ($number!) of $number.
     *
     * @param int|string $number The factorial number
     * @return int|string
     * @throws \Exception
     */
    protected function factorial($number)
    {
        if ($number < 2) {
            return 1;
        } else {
            if (($number <= 20 && 8 <= PHP_INT_SIZE) || ($number <= 12 && 4 === PHP_INT_SIZE)) {
                return ($number * $this->factorial($number - 1));
            } elseif (function_exists('gmp_fact')) {
                return gmp_strval(gmp_fact($number));
            } else {
                throw new \Exception('Out of range number (and GMP extension isn\'t loaded)');
            }
        }
    }
}

it will reinsert a job with updated metadata "failure_recoverable_count", and delay it for some time (for greater "failure_recoverable_count" values will be greater delays).

What do you think, @juriansluiman? If it's good idea, I could add delay param options (to adjust it to your needs from config) and do PR.

juriansluiman commented 9 years ago

Seems a sane idea :+1: Haven't looked at the code that much, @basz and @bakura10 might have an opinion about it as well.

bakura10 commented 9 years ago

I find the idea nice, however this does not apply to SQS :).

SQS keeps track itself of how many times a job was received, and has a feature called "dead letter queue". You can configure to say SQS "hey, after 50 tries, remove the job, or move it to another queue called the dead letter queue for inspecting what happen and why it failed). So I'd be afraid that such a mechanism would enter into conflict.

I'm wondering if other systems such as Beanstalk do not have a similar mechanism. But if they don't, I'm not against the idea, but it should definitely be completely optional and not be activated by default ;).

kusmierz commented 9 years ago

@bakura10 I was looking into, and can't find any similar mechanism for Beanstalk. Nevertheless, you probably can't find for Doctrine and Redis either :) So SQS is exception here.

BTW - I don't think it would be a conflict (as long as you exactly know where and what limit is set).

  1. SQS limit > Slm limit - nothing happened in fact - SQS removes job earlier, Slm worker doesn't get it anymore - but it's not wrong in my mind,
  2. SQS limit < Slm limit - as above - Slm removes it, before SQS use "dead letter queue" feature; it isn't bad either (what difference if you remove job after 50 tries or 100 tries?),
  3. SQS limit = Slm limit - obvious.

I think, you could use "dead letter queue" feature as safety check, just to be sure it has been deleted after a while - but you could also use Slm strategy. What's more - this solution would be independent to Queue implementation - you could change it later to another one, and everything will work exactly the same.

PS Why SQS' method push() has delay_seconds option instead of delay?

roelvanduijnhoven commented 6 years ago

I'll close this one as it more than two years old. Feel free to open new issue if ideas still relevant and / or interested in fixing! :)