Open alon-unifi opened 6 years ago
Experiencing similar issue trying to upgrade several of our projects from 3.x to 4.x and finding our beat tasks missing (but found in the djcelery table).
Hi @ask. Will appreciate a quick comment from you on this if possible. I'm assuming there are a class of djcelery users that would will also face this question and wondering if you could advise on the right approach to take (even in high level). Many thanks.
My solution to this problem -- there is probably a cleaner step # 2 or even a way to do this in fewer commands but this worked for me :)
django-admin dumpdata djcelery --format json --exclude djcelery.taskmeta -o djcelery.json
djcelery
with django_celery_beat
django-admin loaddata djcelery.json
@tgehrs sounds like you performed a migration outside of migrations :) if the original djcelery is completely dead (no future updates), it would be good to have one update pushed to add a migration to move you onto this package, but maybe crossing package boundaries like that isn't standard.
@ntravis I agree that would be nice, hopefully my "solution" will save a few people 20 minutes of frustration :) maybe a transition guide in the docs would be a nice in between?
Please send a PR with your workarounds
@auvipy will do. I attempted to clean it up a little bit and write a function to automate it. But in attempting to test it seems that you cannot have both djcelery and django_celery_beat both installed at the same time due to conflicts in celery version, can you confirm that is the case?
If they cannot both be installed at the same time I will plan to add my previous solution to the installation portion of README.rst, is that a good plan?
If they can both be installed at the same time where in the project should I put the below?
from io import StringIO
import tempfile
import json
from django.core.management import call_command
def import_djcelery_data():
#dump the old data
out = StringIO()
call_command('dumpdata', 'djcelery', format='json', exclude=['djcelery.taskmeta'], stdout=out)
#transform the old data to new data
old_data = json.loads(out.getvalue())
new_data = []
for old_datum in old_data:
for key,value in old_datum.items():
if 'djcelery' in str(value):
old_datum.pop(key)
old_datum[key] = value.replace('djcelery', 'django_celery_beat')
new_data.append(old_datum)
new_data = json.dumps(new_data)
# Reload now dumped data
with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as tmp:
tmp.write(new_data)
tmp.seek(0)
call_command('loaddata', tmp.name, verbosity=0)
Doesnt django_celery_beat
already populate the periodictask and other tasks already if the tasks are registered properly with celery 4.2?
@milind-shakya-sp you are correct in your statement but this issue concerns how to move from the deprecated djcelery library to the django_celery_beat library
I understand, am just thinking what models, data are you thinking of migrating. I can see that some of the tables in the new django-celery-beat
are already populated, during the registration of tasks, what other data are you looking to migrate?
Hi, I'm responsible for migrating a medium-sized django project off of django-celery
in the very near future, and would love to know what the status of all this is. I can't imagine I'm the only one facing this in the next 6 months or so, given the python2 EOL.
Well, you obviously can't just use the same tables, since django names them according to the app name which has changed, but you could probably easily add an manage.py command to import data form the old djcelery tables. If anyone wants to take a stab at it, please send us a PR!
@nthall i think either of my two solutions should do the trick, let me know if you need clarification on either. There probably are better ways to do it too :)
A) Manual -- this is how I did it because I needed something quick to get it completed
djcelery
and save it to djcelery.json
django-admin dumpdata djcelery --format json --exclude djcelery.taskmeta -o djcelery.json
djcelery
with django_celery_beat
django-admin loaddata djcelery.json
B) Script -- This is a script I wrote after the fact, but never tried/tested it. I was under the assumption that removing the django-celery app from settings would delete the database. After doing a little more reading today, that does not seem to be the case, so this script should work
from io import StringIO
import tempfile
import json
from django.core.management import call_command
def import_djcelery_data():
#dump the old data
out = StringIO()
call_command('dumpdata', 'djcelery', format='json', exclude=['djcelery.taskmeta'], stdout=out)
#transform the old data to new data
old_data = json.loads(out.getvalue())
new_data = []
for old_datum in old_data:
for key,value in old_datum.items():
if 'djcelery' in str(value):
old_datum.pop(key)
old_datum[key] = value.replace('djcelery', 'django_celery_beat')
new_data.append(old_datum)
new_data = json.dumps(new_data)
# Reload now dumped data
with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as tmp:
tmp.write(new_data)
tmp.seek(0)
call_command('loaddata', tmp.name, verbosity=0)
I got an error on Python 3.8 for script above, didn't happen in 3.6, not sure what cause is:
File "/opt/master/src/app/migrations/0004_celery4.py", line 22, in import_djcelery_data
for key,value in old_datum.items():
RuntimeError: dictionary keys changed during iteration
Changed line to
for key, value in list(old_datum.items()):
I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone
Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated
Instructions:
python manage.py migrate_from_djcelery
# Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai
Below is the command file
commands/migrate_from_djcelery.py
from django.core.management import BaseCommand
from django.db import transaction
from django_celery_beat.models import (
IntervalSchedule,
CrontabSchedule,
PeriodicTasks,
PeriodicTask,
)
from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import (
execute,
DjIntervalSchedule,
DjCrontabSchedule,
DjPeriodicTask,
DjPeriodicTasks,
)
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("-tz", help="指定旧版djcelery运行的时区")
@transaction.atomic
def handle(self, *args, **options):
tz = "UTC" # pylint: disable=invalid-name
if options["tz"]:
tz = options["tz"] # pylint: disable=invalid-name
new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask)
old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask)
# 迁移带时区的表
execute(CrontabSchedule, DjCrontabSchedule, tz)
# 迁移不带时区的表
for new_table, old_table in zip(new_db_table, old_db_table):
execute(new_table, old_table)
commands/handlers/migrate_from_djcelery_handler.py
The handler file is used to restore the structure of the djcelery model
from django.core.management.base import CommandError from django.db import models from django.utils.translation import ugettext_lazy as _
class DjCrontabSchedule(models.Model): minute = models.CharField(max_length=64, default="") hour = models.CharField(max_length=64, default="") day_of_week = models.CharField(max_length=64, default="",) day_of_month = models.CharField(max_length=64, default="") month_of_year = models.CharField(max_length=64, default="*")
class Meta:
db_table = "djcelery_crontabschedule"
class DjPeriodicTasks(models.Model): ident = models.SmallIntegerField(default=1, primary_key=True, unique=True) last_update = models.DateTimeField(null=False)
class Meta:
db_table = "djcelery_periodictasks"
class DjIntervalSchedule(models.Model): every = models.IntegerField(("every"), null=False) period = models.CharField(("period"), max_length=24)
class Meta:
db_table = "djcelery_intervalschedule"
verbose_name = _("interval")
verbose_name_plural = _("intervals")
ordering = ["period", "every"]
class DjPeriodicTask(models.Model): name = models.CharField( _("name"), max_length=200, unique=True, helptext=("Useful description"), ) task = models.CharField(_("task name"), max_length=200) interval = models.ForeignKey( DjIntervalSchedule, null=True, blank=True, verbosename=("interval"), on_delete=models.CASCADE, ) crontab = models.ForeignKey( DjCrontabSchedule, null=True, blank=True, verbosename=("crontab"), on_delete=models.CASCADE, helptext=("Use one of interval/crontab"), ) args = models.TextField( _("Arguments"), blank=True, default="[]", helptext=("JSON encoded positional arguments"), ) kwargs = models.TextField( _("Keyword arguments"), blank=True, default="{}", helptext=("JSON encoded keyword arguments"), ) queue = models.CharField( _("queue"), max_length=200, blank=True, null=True, default=None, helptext=("Queue defined in CELERYQUEUES"), ) exchange = models.CharField( ("exchange"), max_length=200, blank=True, null=True, default=None, ) routingkey = models.CharField( ("routing key"), maxlength=200, blank=True, null=True, default=None, ) expires = models.DateTimeField(("expires"), blank=True, null=True,) enabled = models.BooleanField(_("enabled"), default=True,) last_run_at = models.DateTimeField( auto_now=False, auto_now_add=False, editable=False, blank=True, null=True, ) total_run_count = models.PositiveIntegerField(default=0, editable=False,) date_changed = models.DateTimeField(autonow=True) description = models.TextField(("description"), blank=True)
no_changes = False
class Meta:
db_table = "djcelery_periodictask"
verbose_name = _("periodic task")
verbose_name_plural = _("periodic tasks")
class DjWorkerState(models.Model): hostname = models.CharField(_("hostname"), max_length=255, unique=True) lastheartbeat = models.DateTimeField(("last heartbeat"), null=True, db_index=True)
class Meta:
"""Model meta-data."""
verbose_name = _("worker")
verbose_name_plural = _("workers")
get_latest_by = "last_heartbeat"
ordering = ["-last_heartbeat"]
class DjTaskState(models.Model): state = models.CharField(_("state"), max_length=64) taskid = models.CharField(("UUID"), maxlength=36, unique=True) name = models.CharField(("name"), max_length=200, null=True, dbindex=True,) tstamp = models.DateTimeField(("event received at"), dbindex=True) args = models.TextField(("Arguments"), null=True) kwargs = models.TextField(("Keyword arguments"), null=True) eta = models.DateTimeField(("ETA"), null=True) expires = models.DateTimeField(("expires"), null=True) result = models.TextField(("result"), null=True) traceback = models.TextField(("traceback"), null=True) runtime = models.FloatField( ("execution time"), null=True, helptext=("in seconds if task succeeded"), ) retries = models.IntegerField(_("number of retries"), default=0) worker = models.ForeignKey( DjWorkerState, null=True, verbosename=("worker"), on_delete=models.CASCADE, ) hidden = models.BooleanField(editable=False, default=False, db_index=True)
class Meta:
"""Model meta-data."""
verbose_name = _("task")
verbose_name_plural = _("tasks")
get_latest_by = "tstamp"
ordering = ["-tstamp"]
def execute(new_table, old_table, tz=None): # pylint: disable=invalid-name if new_table.objects.exists(): print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库") raise CommandError( "The target database {} already has data and cannot be migrated".format( new_table._meta.model_name ) ) for old_data in old_table.objects.all(): new_data = new_table() if tz: new_data.setattr("timezone", tz) for field in old_data._meta.fields: field_name = field.name
if field_name in ("crontab", "interval", "worker"):
try:
# 写入外键id
new_data.__setattr__(
field_name + "_id", old_data.__getattribute__(field_name).id
)
except AttributeError:
new_data.__setattr__(field_name + "_id", None)
else:
new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
new_data.save()
I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone
Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated
Instructions:
python manage.py migrate_from_djcelery # Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai
Below is the command file
commands/migrate_from_djcelery.py
from django.core.management import BaseCommand from django.db import transaction from django_celery_beat.models import ( IntervalSchedule, CrontabSchedule, PeriodicTasks, PeriodicTask, ) from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import ( execute, DjIntervalSchedule, DjCrontabSchedule, DjPeriodicTask, DjPeriodicTasks, ) class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("-tz", help="指定旧版djcelery运行的时区") @transaction.atomic def handle(self, *args, **options): tz = "UTC" # pylint: disable=invalid-name if options["tz"]: tz = options["tz"] # pylint: disable=invalid-name new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask) old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask) # 迁移带时区的表 execute(CrontabSchedule, DjCrontabSchedule, tz) # 迁移不带时区的表 for new_table, old_table in zip(new_db_table, old_db_table): execute(new_table, old_table)
commands/handlers/migrate_from_djcelery_handler.py
The handler file is used to restore the structure of the djcelery model
from django.core.management.base import CommandError from django.db import models from django.utils.translation import ugettext_lazy as _ class DjCrontabSchedule(models.Model): minute = models.CharField(max_length=64, default="*") hour = models.CharField(max_length=64, default="*") day_of_week = models.CharField(max_length=64, default="*",) day_of_month = models.CharField(max_length=64, default="*") month_of_year = models.CharField(max_length=64, default="*") class Meta: db_table = "djcelery_crontabschedule" class DjPeriodicTasks(models.Model): ident = models.SmallIntegerField(default=1, primary_key=True, unique=True) last_update = models.DateTimeField(null=False) class Meta: db_table = "djcelery_periodictasks" class DjIntervalSchedule(models.Model): every = models.IntegerField(_("every"), null=False) period = models.CharField(_("period"), max_length=24) class Meta: db_table = "djcelery_intervalschedule" verbose_name = _("interval") verbose_name_plural = _("intervals") ordering = ["period", "every"] class DjPeriodicTask(models.Model): name = models.CharField( _("name"), max_length=200, unique=True, help_text=_("Useful description"), ) task = models.CharField(_("task name"), max_length=200) interval = models.ForeignKey( DjIntervalSchedule, null=True, blank=True, verbose_name=_("interval"), on_delete=models.CASCADE, ) crontab = models.ForeignKey( DjCrontabSchedule, null=True, blank=True, verbose_name=_("crontab"), on_delete=models.CASCADE, help_text=_("Use one of interval/crontab"), ) args = models.TextField( _("Arguments"), blank=True, default="[]", help_text=_("JSON encoded positional arguments"), ) kwargs = models.TextField( _("Keyword arguments"), blank=True, default="{}", help_text=_("JSON encoded keyword arguments"), ) queue = models.CharField( _("queue"), max_length=200, blank=True, null=True, default=None, help_text=_("Queue defined in CELERY_QUEUES"), ) exchange = models.CharField( _("exchange"), max_length=200, blank=True, null=True, default=None, ) routing_key = models.CharField( _("routing key"), max_length=200, blank=True, null=True, default=None, ) expires = models.DateTimeField(_("expires"), blank=True, null=True,) enabled = models.BooleanField(_("enabled"), default=True,) last_run_at = models.DateTimeField( auto_now=False, auto_now_add=False, editable=False, blank=True, null=True, ) total_run_count = models.PositiveIntegerField(default=0, editable=False,) date_changed = models.DateTimeField(auto_now=True) description = models.TextField(_("description"), blank=True) no_changes = False class Meta: db_table = "djcelery_periodictask" verbose_name = _("periodic task") verbose_name_plural = _("periodic tasks") class DjWorkerState(models.Model): hostname = models.CharField(_("hostname"), max_length=255, unique=True) last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True) class Meta: """Model meta-data.""" verbose_name = _("worker") verbose_name_plural = _("workers") get_latest_by = "last_heartbeat" ordering = ["-last_heartbeat"] class DjTaskState(models.Model): state = models.CharField(_("state"), max_length=64) task_id = models.CharField(_("UUID"), max_length=36, unique=True) name = models.CharField(_("name"), max_length=200, null=True, db_index=True,) tstamp = models.DateTimeField(_("event received at"), db_index=True) args = models.TextField(_("Arguments"), null=True) kwargs = models.TextField(_("Keyword arguments"), null=True) eta = models.DateTimeField(_("ETA"), null=True) expires = models.DateTimeField(_("expires"), null=True) result = models.TextField(_("result"), null=True) traceback = models.TextField(_("traceback"), null=True) runtime = models.FloatField( _("execution time"), null=True, help_text=_("in seconds if task succeeded"), ) retries = models.IntegerField(_("number of retries"), default=0) worker = models.ForeignKey( DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE, ) hidden = models.BooleanField(editable=False, default=False, db_index=True) class Meta: """Model meta-data.""" verbose_name = _("task") verbose_name_plural = _("tasks") get_latest_by = "tstamp" ordering = ["-tstamp"] def execute(new_table, old_table, tz=None): # pylint: disable=invalid-name if new_table.objects.exists(): print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库") raise CommandError( "The target database {} already has data and cannot be migrated".format( new_table._meta.model_name ) ) for old_data in old_table.objects.all(): new_data = new_table() if tz: new_data.__setattr__("timezone", tz) for field in old_data._meta.fields: field_name = field.name # 判断是否为外键 if field_name in ("crontab", "interval", "worker"): try: # 写入外键id new_data.__setattr__( field_name + "_id", old_data.__getattribute__(field_name).id ) except AttributeError: new_data.__setattr__(field_name + "_id", None) else: new_data.__setattr__(field_name, old_data.__getattribute__(field_name)) new_data.save()
hey, would you mind including this script in the docs migration guide?
I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone
Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated
Instructions:
python manage.py migrate_from_djcelery # Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai
Below is the command file commands/migrate_from_djcelery.py
from django.core.management import BaseCommand from django.db import transaction from django_celery_beat.models import ( IntervalSchedule, CrontabSchedule, PeriodicTasks, PeriodicTask, ) from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import ( execute, DjIntervalSchedule, DjCrontabSchedule, DjPeriodicTask, DjPeriodicTasks, ) class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("-tz", help="指定旧版djcelery运行的时区") @transaction.atomic def handle(self, *args, **options): tz = "UTC" # pylint: disable=invalid-name if options["tz"]: tz = options["tz"] # pylint: disable=invalid-name new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask) old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask) # 迁移带时区的表 execute(CrontabSchedule, DjCrontabSchedule, tz) # 迁移不带时区的表 for new_table, old_table in zip(new_db_table, old_db_table): execute(new_table, old_table)
commands/handlers/migrate_from_djcelery_handler.py
The handler file is used to restore the structure of the djcelery model
from django.core.management.base import CommandError from django.db import models from django.utils.translation import ugettext_lazy as _ class DjCrontabSchedule(models.Model): minute = models.CharField(max_length=64, default="*") hour = models.CharField(max_length=64, default="*") day_of_week = models.CharField(max_length=64, default="*",) day_of_month = models.CharField(max_length=64, default="*") month_of_year = models.CharField(max_length=64, default="*") class Meta: db_table = "djcelery_crontabschedule" class DjPeriodicTasks(models.Model): ident = models.SmallIntegerField(default=1, primary_key=True, unique=True) last_update = models.DateTimeField(null=False) class Meta: db_table = "djcelery_periodictasks" class DjIntervalSchedule(models.Model): every = models.IntegerField(_("every"), null=False) period = models.CharField(_("period"), max_length=24) class Meta: db_table = "djcelery_intervalschedule" verbose_name = _("interval") verbose_name_plural = _("intervals") ordering = ["period", "every"] class DjPeriodicTask(models.Model): name = models.CharField( _("name"), max_length=200, unique=True, help_text=_("Useful description"), ) task = models.CharField(_("task name"), max_length=200) interval = models.ForeignKey( DjIntervalSchedule, null=True, blank=True, verbose_name=_("interval"), on_delete=models.CASCADE, ) crontab = models.ForeignKey( DjCrontabSchedule, null=True, blank=True, verbose_name=_("crontab"), on_delete=models.CASCADE, help_text=_("Use one of interval/crontab"), ) args = models.TextField( _("Arguments"), blank=True, default="[]", help_text=_("JSON encoded positional arguments"), ) kwargs = models.TextField( _("Keyword arguments"), blank=True, default="{}", help_text=_("JSON encoded keyword arguments"), ) queue = models.CharField( _("queue"), max_length=200, blank=True, null=True, default=None, help_text=_("Queue defined in CELERY_QUEUES"), ) exchange = models.CharField( _("exchange"), max_length=200, blank=True, null=True, default=None, ) routing_key = models.CharField( _("routing key"), max_length=200, blank=True, null=True, default=None, ) expires = models.DateTimeField(_("expires"), blank=True, null=True,) enabled = models.BooleanField(_("enabled"), default=True,) last_run_at = models.DateTimeField( auto_now=False, auto_now_add=False, editable=False, blank=True, null=True, ) total_run_count = models.PositiveIntegerField(default=0, editable=False,) date_changed = models.DateTimeField(auto_now=True) description = models.TextField(_("description"), blank=True) no_changes = False class Meta: db_table = "djcelery_periodictask" verbose_name = _("periodic task") verbose_name_plural = _("periodic tasks") class DjWorkerState(models.Model): hostname = models.CharField(_("hostname"), max_length=255, unique=True) last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True) class Meta: """Model meta-data.""" verbose_name = _("worker") verbose_name_plural = _("workers") get_latest_by = "last_heartbeat" ordering = ["-last_heartbeat"] class DjTaskState(models.Model): state = models.CharField(_("state"), max_length=64) task_id = models.CharField(_("UUID"), max_length=36, unique=True) name = models.CharField(_("name"), max_length=200, null=True, db_index=True,) tstamp = models.DateTimeField(_("event received at"), db_index=True) args = models.TextField(_("Arguments"), null=True) kwargs = models.TextField(_("Keyword arguments"), null=True) eta = models.DateTimeField(_("ETA"), null=True) expires = models.DateTimeField(_("expires"), null=True) result = models.TextField(_("result"), null=True) traceback = models.TextField(_("traceback"), null=True) runtime = models.FloatField( _("execution time"), null=True, help_text=_("in seconds if task succeeded"), ) retries = models.IntegerField(_("number of retries"), default=0) worker = models.ForeignKey( DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE, ) hidden = models.BooleanField(editable=False, default=False, db_index=True) class Meta: """Model meta-data.""" verbose_name = _("task") verbose_name_plural = _("tasks") get_latest_by = "tstamp" ordering = ["-tstamp"] def execute(new_table, old_table, tz=None): # pylint: disable=invalid-name if new_table.objects.exists(): print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库") raise CommandError( "The target database {} already has data and cannot be migrated".format( new_table._meta.model_name ) ) for old_data in old_table.objects.all(): new_data = new_table() if tz: new_data.__setattr__("timezone", tz) for field in old_data._meta.fields: field_name = field.name # 判断是否为外键 if field_name in ("crontab", "interval", "worker"): try: # 写入外键id new_data.__setattr__( field_name + "_id", old_data.__getattribute__(field_name).id ) except AttributeError: new_data.__setattr__(field_name + "_id", None) else: new_data.__setattr__(field_name, old_data.__getattribute__(field_name)) new_data.save()
hey, would you mind including this script in the docs migration guide?
Of course don’t mind, I hope this script can help more people
for sure
Here is a modified version which adds an app_label because that is now required:
from django.db import models
from django.utils.translation import ugettext_lazy as _
class DjCrontabSchedule(models.Model):
minute = models.CharField(max_length=64, default="*")
hour = models.CharField(max_length=64, default="*")
day_of_week = models.CharField(max_length=64, default="*",)
day_of_month = models.CharField(max_length=64, default="*")
month_of_year = models.CharField(max_length=64, default="*")
class Meta:
app_label = "temp"
db_table = "djcelery_crontabschedule"
class DjPeriodicTasks(models.Model):
ident = models.SmallIntegerField(default=1, primary_key=True, unique=True)
last_update = models.DateTimeField(null=False)
class Meta:
app_label = "temp"
db_table = "djcelery_periodictasks"
class DjIntervalSchedule(models.Model):
every = models.IntegerField(_("every"), null=False)
period = models.CharField(_("period"), max_length=24)
class Meta:
app_label = "temp"
db_table = "djcelery_intervalschedule"
verbose_name = _("interval")
verbose_name_plural = _("intervals")
ordering = ["period", "every"]
class DjPeriodicTask(models.Model):
name = models.CharField(
_("name"), max_length=200, unique=True, help_text=_("Useful description"),
)
task = models.CharField(_("task name"), max_length=200)
interval = models.ForeignKey(
DjIntervalSchedule,
null=True,
blank=True,
verbose_name=_("interval"),
on_delete=models.CASCADE,
)
crontab = models.ForeignKey(
DjCrontabSchedule,
null=True,
blank=True,
verbose_name=_("crontab"),
on_delete=models.CASCADE,
help_text=_("Use one of interval/crontab"),
)
args = models.TextField(
_("Arguments"),
blank=True,
default="[]",
help_text=_("JSON encoded positional arguments"),
)
kwargs = models.TextField(
_("Keyword arguments"),
blank=True,
default="{}",
help_text=_("JSON encoded keyword arguments"),
)
queue = models.CharField(
_("queue"),
max_length=200,
blank=True,
null=True,
default=None,
help_text=_("Queue defined in CELERY_QUEUES"),
)
exchange = models.CharField(
_("exchange"), max_length=200, blank=True, null=True, default=None,
)
routing_key = models.CharField(
_("routing key"), max_length=200, blank=True, null=True, default=None,
)
expires = models.DateTimeField(_("expires"), blank=True, null=True,)
enabled = models.BooleanField(_("enabled"), default=True,)
last_run_at = models.DateTimeField(
auto_now=False, auto_now_add=False, editable=False, blank=True, null=True,
)
total_run_count = models.PositiveIntegerField(default=0, editable=False,)
date_changed = models.DateTimeField(auto_now=True)
description = models.TextField(_("description"), blank=True)
no_changes = False
class Meta:
app_label = "temp"
db_table = "djcelery_periodictask"
verbose_name = _("periodic task")
verbose_name_plural = _("periodic tasks")
class DjWorkerState(models.Model):
hostname = models.CharField(_("hostname"), max_length=255, unique=True)
last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True)
class Meta:
"""Model meta-data."""
app_label = "temp"
verbose_name = _("worker")
verbose_name_plural = _("workers")
get_latest_by = "last_heartbeat"
ordering = ["-last_heartbeat"]
class DjTaskState(models.Model):
state = models.CharField(_("state"), max_length=64)
task_id = models.CharField(_("UUID"), max_length=36, unique=True)
name = models.CharField(_("name"), max_length=200, null=True, db_index=True,)
tstamp = models.DateTimeField(_("event received at"), db_index=True)
args = models.TextField(_("Arguments"), null=True)
kwargs = models.TextField(_("Keyword arguments"), null=True)
eta = models.DateTimeField(_("ETA"), null=True)
expires = models.DateTimeField(_("expires"), null=True)
result = models.TextField(_("result"), null=True)
traceback = models.TextField(_("traceback"), null=True)
runtime = models.FloatField(
_("execution time"), null=True, help_text=_("in seconds if task succeeded"),
)
retries = models.IntegerField(_("number of retries"), default=0)
worker = models.ForeignKey(
DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE,
)
hidden = models.BooleanField(editable=False, default=False, db_index=True)
class Meta:
"""Model meta-data."""
app_label = "temp"
verbose_name = _("task")
verbose_name_plural = _("tasks")
get_latest_by = "tstamp"
ordering = ["-tstamp"]
def execute(new_table, old_table, tz=None): # pylint: disable=invalid-name
if new_table.objects.exists():
print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库")
raise CommandError(
"The target database {} already has data and cannot be migrated".format(
new_table._meta.model_name
)
)
for old_data in old_table.objects.all():
new_data = new_table()
if tz:
new_data.__setattr__("timezone", tz)
for field in old_data._meta.fields:
field_name = field.name
# 判断是否为外键
if field_name in ("crontab", "interval", "worker"):
try:
# 写入外键id
new_data.__setattr__(
field_name + "_id", old_data.__getattribute__(field_name).id
)
except AttributeError:
new_data.__setattr__(field_name + "_id", None)
else:
new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
new_data.save()
I'm in the process of switching from
djcelery
todjango_celery-beat
when upgrading to celery 4.x, asdjcelery
is no longer supported in celery 4. Is there a recommended way to migrate all thedjcelery
model data todjango-celery-beat
models? It's pretty clear that the projects are related to each other by looking at the model structure and committers. Seems like the models are almost exactly the same (with some additional ones indjango-celery-beat
). Yet I can't find any mention ofdjcelery
in thedjango-celery-beat
docs.Should I simply create a data migration in django to directly copy over data from
djcelery
app models to the "matching"django-celery-beat
app models? I'm happy to contribute information to the readme once I understand the guidelines of achieving this, if that will help other as well.Thanks!