php-enqueue / enqueue-dev

Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
https://enqueue.forma-pro.com/
MIT License
2.17k stars 430 forks source link

[Redis] Support for float timeout on brpop #1271

Open renatogcarvalho opened 1 year ago

renatogcarvalho commented 1 year ago

Since Redis 6.0 released on 2020 a float value is allowed to define the timeout for brpop commands:

https://github.com/redis/redis/commit/eca0187370c14aa2c126fe07e5310e44c2780a95

https://redis.io/commands/brpop/

Starting with Redis version 6.0.0: timeout is interpreted as a double instead of an integer.

However, I can't see an easy way to extend the library to offer this support. In the past I've used Symfony decorators to extend the context to provide a different serializer for example, but since all the properties are private it is a little hard to override any internal implementation.

I ended up creating a custom package which I'm basically copying most of the existing code, but altering the parts needed to offer this support, specially both PhpRedis and PRedis clients to allow passing a float timeout on brpop().

Note: The predis library already offers support to pass a float timeout to the Redis engine, however phpredis still does not. I recently submitted an issue about it: https://github.com/phpredis/phpredis/issues/2157. As a workaround I am using rawCommand().

Here's the implementation that I have for receive():

public function receive(int $timeout = 0): ?Message
{
    $timeout = (float) ($timeout / 1000);

    if ($timeout <= 0) {
        while (true) {
            if ($message = $this->receive(600000)) {
                return $message;
            }
        }
    }

    return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay);
}

PhpRedis:

public function brpop(array $keys, float $timeout): ?RedisResult
{
    try {
        if ($result = $this->redis->rawCommand('brpop', \implode(' ', $keys), $timeout)) {
            return new RedisResult($result[0], $result[1]);
        }

        return null;
    } catch (\RedisException $e) {
        throw new ServerException('brpop command has failed', 0, $e);
    }
}

PRedis:

public function brpop(array $keys, float $timeout): ?RedisResult
{
    try {
        if ($result = $this->redis->brpop($keys, $timeout)) {
            return new RedisResult($result[0], $result[1]);
        }

        return null;
    } catch (PRedisServerException $e) {
        throw new ServerException('brpop command has failed', 0, $e);
    }
}