celery / django-celery-results

Celery result back end with django
Other
696 stars 206 forks source link

Celery task results are not stored in the Django database #450

Open nico2am opened 4 weeks ago

nico2am commented 4 weeks ago

Celery task results are not stored in the Django database

Environment

Description

Tasks are visible in Flower but are not being saved to the database task results table. The issue persists despite using the recommended configuration from the documentation and various solutions found in other issues and forums.

Current Configuration

Django Settings

INSTALLED_APPS = [
   ...,
   'django_celery_results',
]
CELERY_TIMEZONE = 'UTC'  # Europe/Paris
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

Celery Configuration (celery.py)

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

app = Celery('project')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    # Task settings
    task_track_started=True,
    task_time_limit=30 * 60,
    task_ignore_result=False,
    result_extended=True,
    # Serialization settings
    accept_content=['application/json'],
    task_serializer='json',
    result_serializer='json',
    # Worker settings
    worker_send_task_events=True,  # Same as -E option
    worker_prefetch_multiplier=1,  # For better task distribution
)

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Attempted Solution

I've implemented a signal handler to manually track task completion and save results:

import logging
from celery.signals import task_postrun
from django_celery_results.models import TaskResult

logger = logging.getLogger(__name__)

@task_postrun.connect
def task_postrun_handler(task_id=None, task=None, state=None, retval=None, **kwargs):
    logger.info(f"Task completed: {task_id}")
    logger.info(f"State: {state}")
    logger.info(f"Result: {retval}")

    try:
        TaskResult.objects.get_or_create(
            task_id=task_id,
            defaults={
                'status': state,
                'result': retval,
                'task_name': task.name if task else '',
            }
        )
        logger.info(f"Result saved for task: {task_id}")
    except Exception as e:
        logger.error(f"Failed to save result for task {task_id}: {str(e)}")

The signal handler successfully creates records in the database, but the default Celery result backend still isn't saving results automatically as expected.

Questions

Additional Information

Any help or guidance would be greatly appreciated!

rootart commented 2 weeks ago

Hi @nico2am it looks like your task explicitly says to not store and ignore the results

@app.task(bind=True, ignore_result=True)
...

Could you please check if results are stored when removing ignore_result or setting it to False?

nico2am commented 1 week ago

Hello,

I check and I donn't have ignore_result, this is my configuration:

celery.py

app.conf.update(
    # Task settings
    task_track_started=True,
    task_time_limit=30 * 60,
    task_ignore_result=False,
    result_extended=True,
    # Serialization settings
    accept_content=['application/json'],
    task_serializer='json',
    result_serializer='json',
    # Worker settings
    worker_send_task_events=True,  # Same as -E option
    worker_prefetch_multiplier=1,  # For better task distribution
)

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

tasks/tasks.py

from celery import shared_task

@shared_task
def send_mail(email_id):
    email = Email.objects.get(id=email_id)