dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.3k stars 1.43k forks source link

CeleryK8sRunLauncher doesn't work with celery_executor #22027

Open XBeg9 opened 4 months ago

XBeg9 commented 4 months ago

Dagster version

1.7.6

What's the issue?

Using CeleryK8sRunLauncher as run launcher and celery_executor, I am getting this error on k8s cluster:

dagster._core.errors.DagsterSubprocessError: During celery execution errors occurred in workers:
celery.exceptions.NotRegistered: 'execute_plan'

What did you expect to happen?

Launching the job it should trigger the job run (kubernetes job) and that job is going to spin up celery tasks for each op. Tasks are going to be processed by celery-worker pods.

How to reproduce?

from dagster_celery import celery_executor

ce = celery_executor.configured(
    {
        "broker": "redis://redis-293d3708-headless.redis-system:6379/0",
        "backend": "redis://redis-293d3708-headless.redis-system:6379/0",
    }
)

@job(executor_def=ce, tags={"t/job": "ingest"})

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

{
"runLauncher": {
                "type": "CeleryK8sRunLauncher",
                "config": {
                    "celeryK8sRunLauncher": {
                        "env": {item["name"]: item["value"] for item in env},
                        "workerQueues": [{"name": "dagster", "replicaCount": 1}],
                    }
}

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

XBeg9 commented 4 months ago

Here is some logs from k8s celery worker:


The delivery info for this task is:
{'exchange': '', 'routing_key': 'dagster'}
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 659, in on_task_received
    strategy = strategies[type_]
KeyError: 'execute_plan'
[2024-05-22 17:14:14,422: ERROR/MainProcess] Received unregistered task of type 'execute_plan'.
The message has been ignored and discarded.```
XBeg9 commented 4 months ago

Looks like the problem is here: https://github.com/dagster-io/dagster/blob/d307f1a5818bc25fb00fbff81d0a75669a4ab5c1/python_modules/libraries/dagster-celery/dagster_celery/executor.py#L128 while celery_k8s_job_executor is using different routing_key https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/executor.py#L233 .... solution, make custom CeleryK8sRunLauncher launcher?....

XBeg9 commented 4 months ago

ok, the fix is here: https://github.com/dagster-io/dagster/blob/master/helm/dagster/templates/deployment-celery-queues.yaml#L64

change dagster_celery_k8s.app to dagster_celery.app

This should be synchronized and we need to give the ability to change executor on the helm side.

henrytseng commented 2 months ago

If you're looking to use CeleryK8sRunLauncher for Celery with Kubernetes

Then you'll want to use

from dagster_celery_k8s import celery_k8s_job_executor

Instead of

from dagster_celery import celery_executor
XBeg9 commented 2 months ago

If you're looking to use CeleryK8sRunLauncher for Celery with Kubernetes

Then you'll want to use

from dagster_celery_k8s import celery_k8s_job_executor

Instead of

from dagster_celery import celery_executor

As I listed before, the problem is inside helm chart