apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.11k stars 14.3k forks source link

Execution Timeout is not working properly on airflow 2.5.0 #29557

Closed deepaktripathi1997 closed 1 year ago

deepaktripathi1997 commented 1 year ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We're experiencing occasional issues with tasks that have specified an 'execution_timeout'. Despite the process being timed out, the task remains stuck in a 'running' state for several hours.

The task looks like this:

@task(task_id='save_job_details', retries=1, retry_delay=5, execution_timeout=execution_timeout,
          pool='nrt_save_job_details', trigger_rule=TriggerRule.NONE_FAILED)
    def save_job_details(**context):
        ti = context['ti']
        connection_details = Connection.get_connection_from_secrets(conn_id='redshift_etl_flack')

        # Pulling all the XCOMS
        job_status = ti.xcom_pull(task_ids='running_redshift_queries', key='job_status')
        last_run_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='last_run_epoch')
        schema_change = ti.xcom_pull(task_ids='list_running_epochs', key='schema_change')
        schema_epoch = ti.xcom_pull(task_ids='list_running_epochs', key='schema_epoch')
        before_count = ti.xcom_pull(task_ids='running_redshift_queries', key='before_count')
        after_count = ti.xcom_pull(task_ids='running_redshift_queries', key='after_count')
        max_kt = ti.xcom_pull(task_ids='running_redshift_queries', key='max_kt')
        print(f"Job status of previous task -> {job_status}")
        print(f"last run epoch received of previous task -> {last_run_epoch}")

        # Creating DAG URL
        dag_id = context['dag'].dag_id
        dag_run_context = context["dag_run"]
        run_id = dag_run_context.run_id
        dag_url = f"""http://10.249.5.183:3000/airflow/dags/{dag_id}/graph?root=&dag_run_id={run_id}"""
        table_id = table['table_id']

        # Finding event_type
        conf = dag_run_context.conf
        event_type = "all" if 'event_type' not in conf else conf['event_type']

        # Saving Job details in RDS

        # Case if no epoch was processed
        if job_status is None:
            job_status = 2
        if job_status == 1 and last_run_epoch is None:
            job_status = 0

        if job_status in (0, 2, 3) or last_run_epoch is None or event_type != 'all':
            cmd = f"""insert into nrt_jobs (table_id, status, table_name)
                    values ({table_id}, {job_status}, '{destination_table_name}')
                     """
        else:
            cmd = f"""insert into nrt_jobs (table_id, status, table_name, last_run_epoch)
                                values ({table_id}, {job_status}, '{destination_table_name}', {last_run_epoch})
                                 """
        print(cmd)

        connection = mysql_hook_nrt.get_conn()

        # saving the stats
        stats_path = f"{nrt_variables['stats_path_s3'].rstrip('/')}/{table['source']}/{table['db']}/{table['table_name']}/ingestion.json"
        stats_parsed = urlparse(stats_path)
        stats_bucket = stats_parsed.netloc
        stats_prefix = stats_parsed.path.lstrip('/')

        stats = {
            "source": table['source'],
            "db": table['db'],
            "table_name": table['table_name'],
            "before_count": before_count,
            "after_count": after_count,
            "max_kafka_timestamp": max_kt,
            "addedon": pendulum.now().to_datetime_string()
        }

        try:
            cur = connection.cursor()
            cur.execute(cmd)
            stats_s3_hook = S3Hook()
            stats_s3_hook.load_string(bucket_name=stats_bucket, key=stats_prefix,
                                      string_data=json.dumps(stats, indent=2, default=str), replace=True)
        except (ClientError, MySQLdb.Error) as e:
            connection.rollback()
            raise AirflowFailException(f"Not able to save stats due to {e}")
        finally:
            connection.commit()
            connection.close()
            print("Stats saved")
            print("Job details saved")

        # CDC code
        if schema_change:
            try:
                cdc_s3_hook = S3Hook()

                CDC(table_name=destination_table_name, ingestion_schema=ingestion_schema,
                    data_dumps_schema=data_dumps_schema).driver_function()
                cdc_s3_hook.load_string(bucket_name=bucket, key=f"{schema_epoch}_CDC_DONE", string_data='')
            except (ClientError, MySQLdb.Error) as e:
                print(f"Outer: Failed CDC due to {e}")
                raise AirflowFailException(f"Outer: Failed CDC due to {e}")

        if os.path.exists(f"redshift_queries_{destination_table_name}"):
            os.remove(f"redshift_queries_{destination_table_name}")

        if os.path.exists(f"epoch_list_{destination_table_name}"):
            os.remove(f"epoch_list_{destination_table_name}")

Value for execution_timeout=pendulum.duration(minutes=3)

Task Logs: Falling back to local log Reading local file: /home/deploy/ssot-airflow/logs/dag_id=realtime_payout_payout_Beneficiary/run_id=scheduled__2023-02-15T13:17:00+00:00/task_id=save_job_details/attempt=1.log


[2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - Job status of previous task -> 1 [2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - last run epoch received of previous task -> 1676474400 [2023-02-15, 20:55:21 IST] {logging_mixin.py:137} INFO - insert into ***

[2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'nrt_rds' for task execution. [2023-02-15, 20:55:21 IST] {base.py:73} INFO - Using connection ID 'aws_default' for task execution. [2023-02-15, 20:55:21 IST] {credentials.py:1049} INFO - Found credentials from IAM Role: ssot-prod-role [2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392

The task is running for past 1.5 hours and not failing.

Operating System Virtualization: kvm Operating System: CentOS Linux 7 (Core) CPE OS Name: cpe:/o:centos:centos:7 Kernel: Linux 6.0.10-1.el7.elrepo.x86_64 Architecture: x86-64

image

This happens only when there is some timeout in this task .

What you think should happen instead

No response

How to reproduce

No response

Operating System

CentOS Linux 7 (Core)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.2.0 apache-airflow-providers-celery==3.1.0 apache-airflow-providers-common-sql==1.3.1 apache-airflow-providers-ftp==3.1.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-imap==3.0.0 apache-airflow-providers-mysql==3.4.0 apache-airflow-providers-postgres==5.0.0 apache-airflow-providers-sftp==4.2.0 apache-airflow-providers-sqlite==3.2.1 apache-airflow-providers-ssh==3.3.0

Deployment

Virtualenv installation

Deployment details

_CPU: 64 core Mem: 256 GB workerautoscale: 1024, 256

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template!

hussein-awala commented 1 year ago

Airflow raises AirflowTaskTimeout exception when the task timed out, and your code can catch this exception and handle it if needed.

Since you have [2023-02-15, 20:58:20 IST] {timeout.py:68} ERROR - Process timed out, PID: 11392 in the log, the timeout exception is raised, but it seems like your code is stuck in one of the finally blocks, where python calls finally before raising the exception. I cannot check whats wrong with your operator, but I can recommend some steps to debug the problem.

First, here is a simple exemple:

import datetime
import time
from typing import Any

import pendulum

from airflow.models import BaseOperator
from airflow.models.dag import dag
from airflow.utils.context import Context

class MyOperator(BaseOperator):
    def execute(self, context: Context) -> Any:
        try:
            print("try")
            time.sleep(120)
        except Exception as e:
            print(e)
            raise e
        finally:
            print("finally")
            time.sleep(120)

@dag(
    schedule=None,
    start_date=pendulum.yesterday(),
)
def timeout_dag():
    MyOperator(task_id="test_task", execution_timeout=datetime.timedelta(seconds=30))

timeout_dag()

And here is the log

[2023-02-19, 21:27:29 UTC] {logging_mixin.py:149} INFO - try
[2023-02-19, 21:27:59 UTC] {timeout.py:68} ERROR - Process timed out, PID: 98045
[2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - Timeout, PID: 98045
[2023-02-19, 21:27:59 UTC] {logging_mixin.py:149} INFO - finally
[2023-02-19, 21:29:59 UTC] {taskinstance.py:1837} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/files/dags/dag18.py", line 19, in execute
    raise e
  File "/files/dags/dag18.py", line 16, in execute
    time.sleep(120)
  File "/opt/airflow/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 98045

To detect the problem, you can create a simple test to execute your operator in the IDE in debug mode, and use breakpoints at each step to follow the call stack. If you find this complicated, you can add a print after each line and read the log from the UI to find where the task is stuck.

Also if you have some special actions to do when the task is timed out, you can add this in a new except block:

except AirflowTaskTimeout as timeout_exception:
    do_something()
    raise timeout_exception