Closed allburov closed 4 months ago
Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached.
Does it mean that Task requests are buffered inside one of 5 processes of
django
app that processed the request? I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we'll have two local (in memory) queues for thisBatchTask
, even for the same user?
No, the tasks are queued in the celery workers, it doesn't matter if they're sent from the same or different clients.
In your example, the tasks are sent from the gunicorn process(es) into the broker, as normal with Celery. The celery worker consuming from that queue then queues them in memory until the flush interval or flush every is met and executes the task.
@clokep The Flash! Thank you for the quick response :)
Oh, I see - the plugin sends Task immediately to the broker, then a worker get a task from the queue but doesn't execute the "business" code until flush_interval
or flush_every
?
How it works with --concurrency=4
then, didn't get it :( If workers are independent processes then celery may run two Tasks for the above case?
Oh, I see - the plugin sends Task immediately to the broker, then a worker get a task from the queue but doesn't execute the "business" code until
flush_interval
orflush_every
?
The statement is correct, but celery-batches doesn't do anything special for sending the task, this is all standard celery behavior.
How it works with
--concurrency=4
then, didn't get it :( If workers are independent, then celery may run two Tasks for the above case?
The following assumes you're using the standard "prefork" configuration of a celery worker (although the explanation doesn't change too much if you're using gevent or eventlet or threads):
Celery workers have a "main" process which fetches tasks from the broker, by default it gets whatever the "prefetch multiplier x concurrency" is (so if your prefetch multiplier is 100 and your concurrency is 4, it attempts to pull up to 400 items from the broker's queue). Once it has those in memory it deserializes them and runs whatever their Strategy
is -- for a normal celery task this essentially just means feeding them to each worker in the processing pool (I think this is done on a task-by-task basis, but I don't remember off the top of my head).
What celery-batches changes is that the "main" celery worker process will queue in memory until flush interval or flush every is reached and send that entire set of tasks to the worker in the processing pool together.
(None of this is really explained well in the celery documentation, as far as I know... celery-batches should explain the difference it is making better, but it is hard without something else to refer to which discusses the "normal" way of doing things.)
Great explanation, didn't know these details about celery! The plugin seems perfect then, it works exactly as I expected for the plugin with such functionality! :)
One more question left - the flow will be broken when we run three "main" workers on different physical nodes or three workers in the same node, as documentation shows https://docs.celeryq.dev/en/stable/userguide/workers.html#starting-the-worker
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
Or it'll be handled by the broker somehow?
One more question left - the flow will be broken when we run three "main" workers on different physical nodes or three workers in the same node, as documentation shows docs.celeryq.dev/en/stable/userguide/workers.html#starting-the-worker
Each worker will have their own in-memory queue, so you need to think carefully about what you want a prefetch multiplier (and concurrency) to be in order to keep your resources well occupied. It will work fine, but it might not work at utmost efficiency.
@clokep thank you for the detailed explanation!
I think we can add How does it work section in README.md and add a link to this issue :D It explains everything.
I think we can add How does it work section in README.md and add a link to this issue :D It explains everything.
I think this should get distilled and added to the documentation, yes.
Hello,
I agree that this is a great module and more examples/docs will be awesome. I made my own system to bulk insert data on ES with Celery/RabbitMQ but I encountered a lot of problems that I'm trying to solve, hopefully with this package.
For example, it seems (thanks to your answers here), that this package can be used with Celery in its default pool, "prefork". My solution can't: I'm keeping in memory documents before it reaches a certain amount to bulk insert data. But I also added a thread timer to check every X minutes if there are no documents left in order to push them (for instance when we don't receive new tasks).
The problem is that when Celery receives a SIGTERM (from Kubernetes for instance), in prefork, it doesn't wait for the tread timer to execute before terminating. Therefore, if I had documents waiting to get pushed, they got lost. I had to use the "solo" pool for that, which carefully wait the execution of the timer before terminating.
Unfortunately, Celery slowly increases the RAM usage and I can't make use of the max-tasks-per-child
and max-memory-per-child
options because they are only available in the "prefork" pool.
So I have a few questions:
max-tasks-per-child
and max-memory-per-child
options and if so, how does it handle them? If I set my limit to 1000 tasks per worker before letting Celery restarting it, your package counts "batches" as "tasks"? (i.e after 1000 batches, it will restart or 1000 tasks/requests?)max-memory-per-child
option: by design, IIRC, if our worker reaches our limit, Celery will finish executing the task, stop listening to RabbitMQ, then safely start a new worker. Is it compatible with your solution?Batches
. How can we handle this when we have already tasks that use classes (I.e @app.task(base=Bulker)
)? Is it possible? Do my "Bulker" class has to extend your Batches
class instead of the Task
class?Thank you very much for your answers!
The problem is that when Celery receives a SIGTERM (from Kubernetes for instance), in prefork, it doesn't wait for the tread timer to execute before terminating.
I haven't run celery under kubernetes but it should wait for any running tasks to finish.
Therefore, if I had documents waiting to get pushed, they got lost. I had to use the "solo" pool for that, which carefully wait the execution of the timer before terminating.
Do you have acks_late
enabled for those tasks? That should ensure they get placed back in the queue (although that's for "standard" Celery, I'm not sure how that would interact with your package).
Is your package compatible with the
max-tasks-per-child
andmax-memory-per-child
options and if so, how does it handle them?
I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of flush_every
, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.
Finally, it seems that in all your examples, your tasks use a class named
Batches
. How can we handle this when we have already tasks that use classes (I.e@app.task(base=Bulker)
)? Is it possible? Do my "Bulker" class has to extend yourBatches
class instead of theTask
class?
Yes.
I haven't run celery under kubernetes but it should wait for any running tasks to finish.
It should, and it does. But a thread timer is not a task per se, it is something next to the task. Therefore, Celery thinks everything is done and terminate.
Do you have
acks_late
enabled for those tasks? That should ensure they get placed back in the queue (although that's for "standard" Celery, I'm not sure how that would interact with your package).
Yes, I have acks_late
activated. But in the "solo" pool, its buggy and it always enable acks_late
even when you set it to False. I could think it's cool since messages are placed back to the queue in case of a crash but my documents waiting to get pushed are still lost. Only the last document added to my "waiting" Python list will get replaced by Celery/RabbitMQ.
I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of
flush_every
, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.
What do you mean by "dropped?", I will test yes. My question was more: does the package respect these options and consider a whole batch as a single "task" (just like we would if we were using Celery normally). In this case, Celery and your package, will wait to the full batch to get executed before terminating/stop listening to the queue which will be great.
I'm just concerned about the same problem I have above with my solution: I want to be sure that when the Celery worker restarts, thanks to these Celery options (either number of tasks or max memory reached), all my documents in the batch will get treated, meaning no loss.
Right now, by using these options alone, I don't have the control to generate a bulk insert when these limits are reached. This is the whole problem. If I could detect that a Celery worker is about to restart because it reaches one of these settings values, I could force a bulk insert and stop right there, no loss. If you have any ideas (sorry for the long message), please do.
Thanks!
I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of
flush_every
, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.What do you mean by "dropped?", I will test yes. My question was more: does the package respect these options and consider a whole batch as a single "task" (just like we would if we were using Celery normally). In this case, Celery and your package, will wait to the full batch to get executed before terminating/stop listening to the queue which will be great.
I did some light testing and it seems that the enqueued requests do get picked back up after a worker restart. (Alternately they should get picked up by a different worker if one exists.)
If there's a running batch task, then yes that gets treated as a single task and Celery will wait during shutdown for it to finish.
I'm just concerned about the same problem I have above with my solution: I want to be sure that when the Celery worker restarts, thanks to these Celery options (either number of tasks or max memory reached), all my documents in the batch will get treated, meaning no loss.
Right now, by using these options alone, I don't have the control to generate a bulk insert when these limits are reached. This is the whole problem. If I could detect that a Celery worker is about to restart because it reaches one of these settings values, I could force a bulk insert and stop right there, no loss. If you have any ideas (sorry for the long message), please do.
I can't really answer questions on how this compares to your current solution or if it will work for your workload, sorry.
I put up #92 with some of the above info. I also had recently published a post going into detail on the celery architecture (this isn't specific to celery-batches at all though): https://patrick.cloke.us/posts/2023/09/15/celery-architecture-breakdown/
Hi! I was looking at plugins for celery in the github and found celery-batches but didn't quite well get how it actually work. It'd be great to add How does it work? section in README.md, as for https://github.com/steinitzu/celery-singleton#how-does-it-work
For instance, I have an usual HTTP API setup:
gunicorn
that runsdjango
withcelery
's app inside (not worker, just publisher).gunicorn
runs 5 processes ofdjango
http app, anddjango
the only code that callscelery
's tasks like this:last_seen_user.delay(user=user_id, when=utcnow())
.celery
's workers - are run withcelery --concurrency=4
How does
celery-batches
work in this setup?The documentation says:
Does it mean that Task requests are buffered inside one of 5 processes of
django
app that processed the request? I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we have two local (in memory) queues for thisBatchTask
, even for the same user and afterflush_interval
orflush_every
(that we count only for THIS process, not for all of them) - we actually send Task to the celery broker, for our case - we send two tasks even if it was the same user.