laravel / ideas

Issues board used for Laravel internals discussions.
939 stars 28 forks source link

Queues: Blocking pop across multiple queues #1489

Open matt-allan opened 5 years ago

matt-allan commented 5 years ago

The Redis and Beanstalkd drivers support a block_for parameter. When set the queue driver will block for block_for seconds waiting for a job to arrive.

If you are working multiple queues both drivers will block for n seconds while checking each individual queue.

For example, let's say you have 2 queues, high and low. block_foris set to 20 seconds. The worker first checks high. It blocks for 20 seconds and no jobs come in. The worker then checks low. It blocks again for 20 seconds and no jobs come in. However, a job is put into the high queue 1 second after the worker begins blocking on the low queue. The worker will block for 19 more seconds before checking the high queue.

Instead we could watch multiple queues when blocking. The worker will receive the first job available across all queues. In the example above the worker would receive the high job immediately because it would be monitoring both the high and low queues simultaneously.

This is already supported by Beanstalkd natively ('watching tubes') and should be straightforward to add to the Redis driver (i.e. BLPOP high low 0)

mfn commented 4 years ago

should be straightforward to add to the Redis driver

How do you propose this to be done?

I researched something else regarding block_for and found this code https://github.com/laravel/framework/blob/e52149bc29bd0b940820deac25dd89afc476178a/src/Illuminate/Queue/RedisQueue.php#L215-L235

/**
     * Retrieve the next job from the queue.
     *
     * @param  string  $queue
     * @param  bool  $block
     * @return array
     */
    protected function retrieveNextJob($queue, $block = true)
    {
        $nextJob = $this->getConnection()->eval(
            LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
            $this->availableAt($this->retryAfter)
        );

        if (empty($nextJob)) {
            return [null, null];
        }

        [$job, $reserved] = $nextJob;

        if (! $job && ! is_null($this->blockFor) && $block &&
            $this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
            return $this->retrieveNextJob($queue, false);
        }

        return [$job, $reserved];
    }

Specifically, at this point in the code RedisQueue responsible for performing a blpop is already set up for a single queue.

I only run Laravel Horizon and see that, for each worker a RedisQueue instance is used, meaning at this point only one queue is known here. The BeanstalkdQueue driver is technically based on the RedisQueue driver and thus exhibits the same characteristics.

Maybe I misread things 🤷‍♀️, but it seems even if we want to do that, it doesn't know about other queues so it can't easily blpop on many queue here.

mfn commented 4 years ago

I guess an adaptions already needs to start in https://github.com/laravel/framework/blob/3f138eb02c1d2406fb71192fa9ba9d7ed7a56ccd/src/Illuminate/Queue/Worker.php#L280-L287

    protected function getNextJob($connection, $queue)
    {
        try {
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $connection->pop($queue))) {
                    return $job;
                }
            }

pop itself is deigned to only work with a single queue and this would need to be turned around here I guess.

matt-allan commented 4 years ago

Hi @mfn,

Yeah exactly, as you pointed out we would need to change the signature from Queue::pop(string $queue = null) to Queue::pop(...string $queue = null) (doesn't need to use that exact signature, it's just an example), then update Worker::getNextJob to pass the entire array of queue names.

For the redis driver we would need to update this line to call BLPOP with all of the queues. My comment about it being straightforward to add was pointing out that Redis' BLOP command is variadic.

Beanstalk already support this -- we would call watchOnly for the first queue like we do now, then chain a call to watch for any additional queues.