spiral / roadrunner-bridge

🌉 RoadRunner bridge to Spiral Framework
https://spiral.dev/docs/packages-roadrunner-bridge
MIT License
13 stars 6 forks source link

Adding interceptors support #23

Closed msmakouz closed 2 years ago

msmakouz commented 2 years ago

This PR adds the ability to use interceptors, custom headers, re-push a job with delay or custom headers.

Interceptors

Configuration via config file

return [
    'interceptors' => [
        'push' => [
            SomeInterceptor::class
        ],
        'consume' => [
            ErrorHandlerInterceptor::class,
            new OtherInterceptor(),
            new \Spiral\Core\Container\Autowire(ThirdInterceptor::class),
        ],
    ],
];

Configuration via QueueBootloader

use Spiral\Queue\Bootloader\QueueBootloader;

class AppBootloader extends Bootloader
{
    public function init(QueueBootloader $queue): void
    {
           $queue->addPushInterceptor(ValidatePayloadInterceptor::class);
           $queue->addConsumeInterceptor(OtherInterceptor::class);
    }
}
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Spiral\Console\Command;

class ValidatePayloadInterceptor implements \Spiral\Core\CoreInterceptorInterface
{
    /**
     * @param array{
     *     driver: string,
     *     queue: string,
     *     id: string,
     *     payload: array
     * } $parameters
     */
    public function process(string $jobName, string $action, array $parameters, CoreInterface $core): int
    {
         // validate payload ....

         if (!$valid) {
             throw new \Exception('...');
         }

         return $core->callAction($jobName, $action, $parameters);
    }
}

Custom headers

namespace App\Controller;

use App\Job\Ping;
use Spiral\Queue\QueueInterface;
use Spiral\RoadRunnerBridge\Queue\Options;

class HomeController
{
    public function __construct(
        private readonly QueueInterface $queue,
    ) {
    }

    public function ping(): string
    {
        $jobID = $this->queue->push(Ping::class, [
            'value' => 'hello world',
        ], (new Options())->withHeader(name: 'name', value: 'value'));

        return sprintf('Job ID: %s', $jobID);
    }
}

Re-pushing a job

namespace App\Interceptor;

use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterface;
use Spiral\Queue\Exception\RetryException;
use Spiral\RoadRunnerBridge\Queue\Options;

class CustomInterceptor implements CoreInterceptorInterface
{
    public function process(string $name, string $action, array $parameters, CoreInterface $core): mixed
    {
        // ...

        $options = (new Options())->withHeader(name: 'name', value: 'value')->withDelay(4);

        // This code is just an example. It will cause an infinitive loop of re-pushing the job!
        // Take care of the re-push condition in your code.
        throw new RetryException('Some reason', $options);
    }
}
codecov[bot] commented 2 years ago

Codecov Report

Merging #23 (3e00649) into 2.0 (d649551) will decrease coverage by 1.43%. The diff coverage is 57.14%.

@@             Coverage Diff              @@
##                2.0      #23      +/-   ##
============================================
- Coverage     88.93%   87.50%   -1.44%     
- Complexity      195      199       +4     
============================================
  Files            39       39              
  Lines           506      512       +6     
============================================
- Hits            450      448       -2     
- Misses           56       64       +8     
Impacted Files Coverage Δ
src/Queue/Dispatcher.php 70.83% <50.00%> (-24.17%) :arrow_down:
src/Queue/Queue.php 83.33% <71.42%> (-10.42%) :arrow_down:

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.