Open barryvdh opened 5 years ago
FIFO is required; all events should be processed in the same order (so other events should wait for the first ones to finish)
I think this one could use some clarification. As currently stated, and with an extremely bitter reading of the sentence, it states that the queue system may only execute one job at a time ("other events should wait for the first ones to finish").
I think that FIFO is more of a best-effort, rather than required. But that could just be my interpretation with job delays, retries, releases and other things in mind. If you dispatch a job to the queue, and in that job release it back to the queue, to you expect it to keep the position it initially had, or end up at the end of the queue?
Yeah I meant FIFO for that concurrency queue. So for example, I have a database with 1000 clients, each client can have 5 API integrations. Events for that client should be processed FIFO. So otherwise you would just run 1 queue per client, but that doesn't scale wel. (eg. would need 5000 workers). So if a queue picks up a job for (client 34, api x), the other works should ignore that client/api combo until it's finished. Thus having a concurrency of 1 for that key.
This is a pretty hard one. If you consider AWS SQS driver, for instance, it would never be able to implement this since you would only know if you got a message with that key combo after getting the message and if you decide to release it back to the queue it goes to the end of it. I don't know how the redis driver works but the database one also do a select for update. In theory you could just not update the job, but then the worker is going to get the same record forever.
The concurrency_key thing is very easy to implement for a database-backed queue. I believe this matches what is suggested, and could perhaps explain the idea for other queues.
SELECT * FROM jobs
WHERE /* the usual things */
AND ( (concurrency_key IS NULL) )
OR (concurrency_key NOT IN ( /* select currently executing concurrency_key */ ))
ORDER BY id ASC
LIMIT 1
FOR UPDATE
protected function getNextAvailableJob($queue) {
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->where(function (Builder $query) {
// This is the change compared to parent::getNextAvailableJob
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$query->whereNull('concurrency_key')
->orWhereNotIn(
'concurrency_key',
$this->database
->table('jobs')
->where('reserved_at', '>', $expiration)
->whereNotNull('concurrency_key')
->select('concurrency_key')
);
})
->orderBy('id', 'asc')
->first();
return $job ? new DatabaseJobRecord((object)$job) : null;
}
@barryvdh I also encountered this problem (referenced by JayBizzle). If jobs have no relation it would not be a problem but in case certain jobs are related (like a sequence of data elements spread over multiple jobs where each element has a timestamp where the ordering of timestamps is important) it is crucial.
I'm not sure why the funnel method does not tackle this problem, since it gives an opportunity to group related jobs by key and limit the number of workers processing it. I would guess that setting a limit to 1 would mean that only a single worker will process all these related jobs in fifo order. No concurrency due to the limit, but from some tests I see that FIFO handling sometimes fails as well.
A work around that comes to my mind to tackle scalability (a bit, not a definite solution) is to define a couple of queues and assign one worker to each queue. Then map certain related jobs to one and only one of these defined queues. As long as one worker is handling a single queue, there would be no concurrency. But like I stated it is just a work around to avoid concurrency, a general solution to guarantee FIFO handling would be the best.
but in case certain jobs are related (like a sequence of data elements spread over multiple jobs where each element has a timestamp where the ordering of timestamps is important) it is crucial.
I'm not sure this is FIFO; this description opens up for the possibility to inject jobs into the beginning or middle of the queue. It sounds pretty much as if you're using the available_at functionality to order them; and then it becomes a "first available first out". I think that this matches how the queue already works. (This could also be that I've misunderstood the meaning of fifo when you have a delay for the jobs.)
The funneling approach with a limit of 1 should limit the concurrency; but when you release the job back to the queue it will count as a new (albeit identical) job, and go to the end of the list of jobs to execute. The funneling approach will thus not guarantee the order.
The current way of guarantee that jobs execute in order is to chain them.
Job chaining allows you to specify a list of queued jobs that should be run in sequence. If one job in the sequence fails, the rest of the jobs will not be run. To execute a queued job chain, you may use the withChain method on any of your dispatchable jobs:
@sisve, thanks for your reply and time. My jobs are created from a continuous stream of data. This data arrives from consecutive requests where a job (containing data) is created per request. The desired mechanism should put jobs in order of arrival in the queue and multiple background workers should process these jobs (in order of arrival) thus a fifo queue. Job chaining is not possible since data (and derived jobs) is created in consecutive requests.
I was not aware that a release during funneling results in a new job.
Hey @barryvdh, did you find a solution to this?
I have the exact same use case and would probably be fine with redis funnel, except in some cases I do need FIFO and some sort of wildcard per-tenant queue.
For FIFO we simply went with AWS SQS Fifo queues and a driver to match it. It sucks but 🤷🏻♂️
Queues are great for processing a lot of stuff fast, but I've run in to the situtation multiple times that some level on concurrency/locking is required, which is currently not possible (without modification). Ofcourse it's possible to run 1 queue for each task, but this doesn't scale well.
Examples:
client2.github-api
should not be popped for the next 30 secs)Throttle/Funnel (https://laravel.com/docs/5.7/queues#rate-limiting) is possible but it has two problems:
So not sure how this would be possible, but a possibility would be to add a
concurrency_key
to the jobs and only filter on jobs that are not currently active and not locked. This is doable with the Database driver, but this misses out on stuff like Horizon.