Closed dwreeves closed 2 years ago
Hi,
If you start Celery with flask celery worker
and remove create_celery_app
in app.py
then Celery won't start because it doesn't pick up your configuration in config/settings.py
. What did you do to get around that? I tried things flask celery --config "config.settings:CELERY_CONFIG" worker
but it had no effect.
Celery also has the idea of from celery import current_app as current_celery_app
which lets you reference the current app of Celery just like you can with Flask. I'm guessing if you went down this route you'd call your tasks with this app but we're still stuck with getting custom configuration into Celery using this Flask CLI method.
By the way celery -A "hello.app.celery_app" worker
never has to change besides your app's name. You can control which tasks get loaded into Celery by modifying the "include": ["myapp.user.tasks"]
in CELERY_CONFIG
for all of your task files.
I wasn't thinking about removing the create_celery_app
function. I included the example I was thinking about here:
https://github.com/dwreeves/docker-flask-example/commit/b4d1979fd0bd399e0612f05bf929d506b04b2964
But maybe this is a better task for a sort of Flask+Celery extension that does other things, rather than something to stick in an app.
One more question I had regarding Celery which is in my commit, but does it make sense to include a Celery ping as part of a health check? Or is the Redis ping sufficient?
Oh, the TaskBase
bit was taken from an older Celery snippet.
Looks like the Flask Celery snippet has been updated since with (ignore the deprecated config styling bits):
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
What do you think of that (I pasted it from their docs)? I haven't tested it yet btw.
As for the health check for Celery, if you use the same Redis server for both Redis and Celery then the Celery one isn't really adding anything other than potentially checking if Celery is configured to connect to Redis (which is a good thing) but if it weren't able to connect due to a misconfiguration then it wouldn't have started initially. If they are different then it's totally worth putting in the separate check since it's checking both Redis servers.
Definitely looks more pythonic, but also have not tested.
By using Task.run
instead of Task.__call__
it appears you end up missing a few arcane steps that would normally run otherwise. https://github.com/celery/celery/blob/master/celery/app/task.py#L385 Not sure what the significance of that would be, as I'm still learning Celery. And I'm inclined to trust the Flask docs.... but, that said, it's possible the Flask docs shouldn't be overriding __call__
then directly running self.run
, instead of using super().__call__
? 🤔
Maybe something like this works as an extension for Flask + Celery to avoid repeating this pattern for every code base. I'm going to be honest and say I wouldn't maintain such a thing, so I wouldn't bother making it myself. Just throwing it out there into the void.
from functools import wraps
from flask import current_app
from flask.cli import with_appcontext
from celery import Celery as _Celery
from celery.bin.celery import celery as _celery_cmd
class Celery(_Celery):
@wraps(_Celery.__init__)
def __init__(self, *args, **kwargs):
app = kwargs.pop("app", None)
super().__init__(*args, **kwargs)
# Provide the app context to Celery tasks
class ContextTask(self.Task):
def __call__(self, *args, **kwargs):
with self._flask_app.app_context():
return super().__call__(*args, **kwargs)
self.Task = ContextTask
if app is not None:
self.init_app(app)
else:
self._flask_app = current_app
def init_app(self, app):
self._flask_app = app
# Config
if self.main is None:
self.main = app.name
self.conf.update(app.config.get("CELERY_CONFIG", {}))
# Add as extension to the app
app.extensions.setdefault("celery", self)
# Add as entrypoint in setup.py
celery_cmd = with_appcontext(_celery_cmd)
(Edit: Fixed issue in code example.)
I've been using this code template to learn. Great stuff, this has been very helpful! I have a few notes regarding the Celery implementation in particular.
from celery import Task
which is worse thancelery_app.Task
, so ignore.TaskBase.__call__(self, *args, **kwargs)
seems less pythonic thansuper().__call__(*args, **kwargs)
As of Celery 5.0, the Celery CLI is actually a Click command, so you can do something like this:
The major benefit of this is that you no longer need to modify
celery -A "hello.app.celery_app" worker
, and simplyflask celery worker
works. So it becomes agnostic to where the celery app is defined, and removes the need to update this part of the script for a real app not namedhello
. The reason why it becomes agnostic seems to be: because it's called via the flask CLI, it guarantees that the normal Flask app initialization code is run prior to the Celery command being run, so it knows that Celery was called insidehello.app.celery_app
. (Another benefit could potentially be decorating the command withflask.cli.with_appcontext
, but I've not found a reason for that.)WDYT?