yiisoft / queue

Queue extension for Yii 3.0
https://www.yiiframework.com/
BSD 3-Clause "New" or "Revised" License
54 stars 28 forks source link

[Enhancement] Simplify Adapter and queue implementation #200

Closed g41797 closed 6 months ago

g41797 commented 6 months ago

Main purpose of this proposal - to simplify adding new broker to yii3 queue and queue implementation in general:

interface BrokerInterface
{
    public function withChannel(string $channel): self;

    public function push(MessageInterface $job): ?IdEnvelope;

    public function jobStatus(IdEnvelope $job): ?JobStatus;

    public function pull(float $timeout): ?IdEnvelope;

    public function done(IdEnvelope $job): bool;
}

Adapter code is completelly generic and rather straightforward see below POC:

class Adapter implements AdapterInterface
{
    public function __construct(
        private BrokerFactoryInterface $brokerFactory,
        private string              $channelName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
        private array               $brokerConfiguration = [],
        private ?LoggerInterface    $logger = null,
        private ?LoopInterface      $loop = null,
        private float               $timeout = 1.0,
    ) {
        if (null == $loop ) {
            $loop = new NullLoop();
        }

        if (null == $logger) {
            $this->logger = new NullLogger();
        }

        $this->getBroker();
    }

    public function push(MessageInterface $message): MessageInterface
    {
        return $this->broker->push($message);
    }

    public function status(int|string $id): JobStatus
    {

        $jobStatus = $this->broker->jobStatus($id);

        if ($jobStatus == null)
        {
            throw new \InvalidArgumentException('job does not exist');
        }

        return $jobStatus;
    }

    public function runExisting(callable $handlerCallback): void
    {
        $this->processJobs($handlerCallback, continueOnEmptyQueue: false);
    }

    public function subscribe(callable $handlerCallback): void
    {
        $this->processJobs($handlerCallback, continueOnEmptyQueue: true);
    }

    private function processJobs(callable $handlerCallback, bool $continueOnEmptyQueue): void
    {
        $result = true;
        while ($this->loop->canContinue() && $result === true) {
            $job = $this->broker->pull($this->timeout);
            if (null === $job) {
                if ($continueOnEmptyQueue)
                {
                    continue;
                }
                break;
            }
            $result = $handlerCallback($job);
            $this->broker->done($job->getId());
        }
    }

rest of the code see queue-nats repo

samdark commented 6 months ago

@viktorprogger what do you think?

xepozz commented 6 months ago

I have some thoughts about the idea:

withConfig() – delete

I think it must be encapsulated into a broker/driver/adapter. Just use __construct() method.

isConnected() – delete

Not all brokers can support real-time connection. Some can just open a connection, do job (push/pull) and close, so-called connectionless operations, like filesystem or an HTTP based driver. Moreover, there are no reasons to keep the method in the interface because there are no connect()/disconnect() methods.

submit() / next() – rename to push() / pull(), exist

See https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L34, https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L18

jobStatus() – move

See https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L29

It's also present in another interface: https://github.com/yiisoft/queue/blob/master/src/QueueInterface.php#L46

next() – refactor

There are interesting things that next() (or pull()) can return a message object from the queue. Current implementation accepts a callable and process it inside.

update() – delete

Not all drivers can update status, because it's not just a property of an object. If you want to change status from in process to waiting you must ask the worker that processes the job to stop it's work and return the job back to the queue. In some drivers it's impossible and the only possible way is to push the job as a new message.

Conclusion

I don't see any reasons to accept the proposal, there are no gain in UX / DX.

But I think it could be useful to have next($timeout) method to retrieve a message.

xepozz commented 6 months ago

@g41797 still no changes, all of this is available now. You made a snippet for userland. There are place for this.