astronomer / astronomer-cosmos

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code
https://astronomer.github.io/astronomer-cosmos/
Apache License 2.0
612 stars 154 forks source link

Cosmos tasks randomly getting marked as zombie tasks #1002

Open 1cadumagalhaes opened 4 months ago

1cadumagalhaes commented 4 months ago

Not sure if this should be here, but it happens mostly with cosmos dags.

My airflow instance is in a kubernetes cluster, so this might be a problem with the scheduler, not with cosmos itself. I'm using CeleryExecutor right now, might switch these dags to KubernetesExecutor to see if it works.

But anyways, if anyone knows how to help it would be great

[2024-05-27, 22:00:37 UTC] {task_context_logger.py:104} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/repo/dags/dags/dbt/rtz.py', 'processor_subdir': None, 'msg': "{'DAG Id': 'transform___dbt-instagram__dev', 'Task Id': 'instagram_models.all_media_insights_run', 'Run Id': 'scheduled__2024-05-27T21:20:00+00:00', 'Hostname': 'airflow-worker-1.airflow-worker.orchestration.svc.cluster.local', 'External Executor Id': '72425eed-2b53-4d22-81f0-fee59bbb3c99'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f856ebd3010>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)

Sometimes it is marked as zombie task even if it did execute correctly

dosubot[bot] commented 4 months ago

Switching to KubernetesExecutor might help resolve the issue of tasks being marked as zombie tasks, especially since your Airflow instance is running in a Kubernetes cluster.

Here's an example of how to define a DAG using DbtRunKubernetesOperator:

from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos.operators.kubernetes import DbtRunKubernetesOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

with DAG(
    dag_id='example_dbt_kubernetes_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
) as dag:

    run_dbt_task = DbtRunKubernetesOperator(
        task_id='run_dbt_task',
        profile_config=None,  # Add your profile config if needed
        namespace='default',  # Kubernetes namespace
        image='your-dbt-image',  # Docker image for dbt
        cmds=["dbt", "run"],  # dbt command
        name='dbt-run-pod',
        in_cluster=True,
        is_delete_operator_pod=True,
        get_logs=True,
    )

    run_dbt_task

Update your airflow.cfg to use KubernetesExecutor:

[core]
executor = KubernetesExecutor

Restart your Airflow services after making these changes. This should help mitigate the issue of tasks being marked as zombie tasks [1].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

pankajastro commented 4 months ago

Which version of Cosmos are you using?

tatiana commented 3 months ago

@1cadumagalhaes could you also confirm the Cosmos LoadMethod you're using?

1cadumagalhaes commented 3 months ago

Sorry! I'm using cosmos 1.3.2 @pankajastro

And I haven't set the LoadMethod, so it's automatic. I think it is rendering it using dbt_ls based on the logs @tatiana :

Unable to do partial parsing because saved manifest not found. Starting full parse.
pankajastro commented 3 months ago

Sometimes it is marked as zombie task even if it did execute correctly

Since this is not persistent, I feel dag is converted correctly so there might be an issue with the scheduler as you have pointed out. But do you see any unusual ti config for the zombie task? Also, we have released 1.4.3 which introduces some performance improvements I'm not sure if it will address this issue or not but would you like to try it once.

dwreeves commented 3 months ago

Yeah I believe this is a memory issue.

"Zombie tasks" is the error message you get on Astronomer if your celery worker sigkills.

This happens because Cosmos spins up subprocesses on Celery workers, and each of those subprocesses will exceed 100 MiB. On top of the overhead of the Celery worker itself, this adds up quite a bit and sigkills occur very frequently.

IMO, my 2 cents, we really need to add a more opinionated guide on this. Especially now with the invocation mode that runs dbt directly in the python process instead of spinning up a new subprocess, and now that dbt-core's dependency locks aren't incompatible with Airflow 2.9's, Astronomer users can easily avoid this. But there is no guide that tells people how to do that, never mind anything that opinionatedly tells users this. Cosmos is confusing and users are likely to do what they are told, so we should tell them to do the things that don't cause these sorts of issues.

w0ut0 commented 3 months ago

@dwreeves, any (opiniated) suggestions in terms of required resources per worker/ tasks per worker/...?

dwreeves commented 3 months ago

@w0ut0 My most opinionated suggestion is to use the DBT_RUNNER invocation mode. I'm sure the Astronomer people would like it if I suggested using bigger workers since that's more money for them 😂😅 (although real talk, if any Astronomer people are reading this, "high memory" workers would be a great feature!) but you can avoid a ton of overhead by not spawning multiple subprocesses.

w0ut0 commented 3 months ago

Thanks, will test if it makes a significant improvement in our tasks without increasing the worker resources!