apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.37k stars 14.11k forks source link

execution_timeout not enforced, task hangs up #35474

Open xmariachi opened 10 months ago

xmariachi commented 10 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Version: 2.5.1 Run env: MWAA on AWS

Summary: Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed, not allowing any other task to be placed for this dag; and so its execution_timeout is not enforced. In my experience, it only happens on tasks that consume from Kafka using library confluent_kafka. The execution_timeout is enforced in other tasks.

Dag definition code:

# Dag Info
default_args = {
    "retries": 3,
    "on_failure_callback": on_failure_callback,
    "sla": timedelta(hours=2),
    "execution_timeout": timedelta(hours=4),
}

@dag(SERVICE_NAME,
     default_args=default_args,
     schedule_interval="*/5 * * * *",
     start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
     catchup=True,
     tags=['critical', 'dumper', 'kafka'],
     max_active_runs=1)
def process_records():
    ingest_from_kafka_and_save()

The ingest_from_kafka_and_save() contains code that consumes from Kafka, providing a callback function to the consumption (which I suspect may have something to do with the problem, since it happens asynchronously).

It's hard to reproduce since it is temperamental and happens every once in a while. Audit Log does not show anything special - just seems the hang indefinitely. Consumption code itself works fine otherwise and it has been running for months in this and other dags that use it - but they also show the same behaviour.

What you think should happen instead

The execution_timeout should be enforced and the task should be killed so a new one could be placed.

How to reproduce

It is hard to reproduce, since it happens very unfrequently.

Operating System

MWAA on AWS

Versions of Apache Airflow Providers

--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt" apache-airflow-providers-amazon apache-airflow-providers-snowflake==4.0.2 apache-airflow-providers-mysql==4.0.0 apache-airflow-providers-slack confluent-kafka==2.1.0

Deployment

Amazon (AWS) MWAA

Deployment details

Medium sized cluster 2.5.1 version, latest update applied 2 weeks ago.

Anything else

Unclear what triggers the error - but whatever the error, the task should be killed to enforce the execution_timeout. Seems like an internal thread management issue.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 10 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 10 months ago

Hi @xmariachi - as debugging of your reported problem is hard and can be caused by multiple circumstances also outside of control of Airflow.... can you check when the node/POD hangs to exec into and produce a Python Stack-Trace from the running instance for diagnosis? You could for example follow the description in https://stackoverflow.com/questions/6849138/check-what-a-running-process-is-doing-print-stack-trace-of-an-uninstrumented-py

xmariachi commented 10 months ago

Thanks @jscheffl , will do. However, this is AWS MWAA - is that feasible to do there as well?

Taragolis commented 10 months ago

In general we accept bugs which could be reproduce in Open Source implementation on latest stable version of Airflow, for that purpose you could try to use provided Docker Compose from Running Airflow in Docker

There are several reasons for this

A bit more information about execution_timeout itself, it is implemented by use SIGALARM signal, and behaviour of this signal might not work in some cases,

This block of code could demonstrate it demonstrate

import signal
import time

class TimeoutError(Exception):
    ...

def some_func():
    try:
        while True:
            time.sleep(0.1)
    except Exception:
        pass
    print("Nope")

def some_another_func():
    while True:
        time.sleep(0.1)
    print("Nope")

def handler(signum, frame):
    raise TimeoutError(f"Timeout!")

TIMEOUT = 2

### This one not failed, because error will caught in try..except, and instead of TimeoutError ``Nope`` would print
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
    some_func()
finally:
    signal.alarm(0)

### This one failed, and TimeoutError will raise
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
    some_another_func()
finally:
    signal.alarm(0)
potiuk commented 10 months ago

This is ilikely something we cannot do anything about you need to raise it to Kafka developers. The problem is that if you have C library that hangs and does not periodically check for signals we cannot do much.

https://python.readthedocs.io/en/stable/library/signal.html

A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.

Possibly we could think about additional escalation level and sending sigkill in such cases after additional timeout. WDYT @Taragolis ? I think that should be possible as we are forking in local task job and we have a hearbeating process that is pure python that could potentiall send SIGKILL to the actual "task" process.

WDYT?

Taragolis commented 10 months ago

I think yes we need an additional escalation level for execution timeout.

The problem of current implementation that we raise an error in handler function and have no idea in which place in Main thread it actually raised, as result user code, third-party libraries and airflow components itself might prevent exit from the task.

In general in most cases it works fine, even according to description of the issue "Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed"

Just for demonstrate current flaky behaviour with a bit greater probability to survive after TimeoutError


import signal
import time

class TimeoutError(Exception):
    ...

def flacky_behaviour():
    ix = 0
    delay = 0.005
    each_second = 1 / delay

    while True:
        # Handle all regular exceptions here
        try:
            # SIGALRM Handler raise error here? Then code survive
            # change TimeoutError base class to BaseException might help in this case
            time.sleep(delay)
        except Exception:
            print("Nope")

        # Handle all exceptions include ``SystemExit`` (sys.exit) and ``KeyboardInterrupt`` (SIGINT)
        try:
            # SIGALRM Handler raise error here?
            # Well... better luck next time
            time.sleep(delay)
        except:
            print("Serious NOPE!")

        # If error happen outside of try block then execution would terminate
        time.sleep(delay * 2)

        if ix % each_second == 0:
            print("It's alive!")
        ix += 1

def handler(signum, frame):
    print(f"Raise TimeoutError, MRO {TimeoutError.mro()}")
    raise TimeoutError("Timeout!")

TIMEOUT = 2

signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
    flacky_behaviour()
finally:
    signal.alarm(0)

There is couple of different solution (an combination) in my head which might make things better (or worse). Maybe better move this discussion into Dev List for wider group of people

Option 1

https://github.com/apache/airflow/blob/de92a81f002e6c1b3e74ad9d074438b65acb87b6/airflow/exceptions.py#L81-L82

Inherit AirflowTaskTimeout from BaseException rather than AirflowException, it should help in case of try ... except Exception:, It doesn't help in case if upstream code use try ... except: but such code is a Worst Practice in python anyway.

Ideally we could think about changing inheritance of AirflowException from Exception to BaseException in Airflow 3, and make most of exceptions internal only and make only couple of then as part of public interface

Option 2

In additional to exception write TIMEOUT state into the DB backend, so upstream processes could kill hung process In some circumstances would breaks silent_fail of BaseSensorOperator and might be on_kill methods

Option 3

Introduce new task state TIMEOUT and control execution timeout in Scheduler. Well this one require bigger effort to implements (JobRunners, new trigger rules) and think have it also have some side effects include the fact that some contributors/PMC would be against of new state since it add complexability to the already complex system

potiuk commented 10 months ago

How about another option. I think we already use (depends on runner - could be also spawned and cgroups migh be involved - but generally it's the default) fork local task process execution - I believe when task is run, there is one main process (LocalTaskJob) that watches for the "child" process and regularly pings the DB with heartbeat (or so I understand it happens) - while the child process is doing the job. I think that parent processs does not actually parse the task to know the timeout (so that's a bit of a problem), but we could POTENTIALLY modify scheduler (that knows the timout from the serialized DAG) to pass such timeout to executor (and subsequently to the parent process) as additional parameter.

Then, assuming that this parent process is not getting into a long running C job and does not hang, it would be relatively easy to do task kill escalation - the usual SIGTERM, SIGHUP, SIGKILL dance with SIGKILL ultimately killing even most stubborn forked processes. The parent process is not doing much, it merely communicates with Airflow DB via heartbeats (as I understanda) and waits for the forked process to finish, so chances that this process will hang are slim.

That would be pretty robust solution, I think?

Taragolis commented 10 months ago

I guess this should be implemented in top of the current implementation? Correct me if I wrong.

  1. Try to raise exception AirflowTaskTimeout
  2. Heartbeat checker also check timeout, something simple (if we work in UTC) as start_time + execution_timeout < current_time. And if it happen give some additional grace period before terminate/kill
potiuk commented 10 months ago

YEp. @Taragolis . That would be my idea.

It comes from the assumption that in order to REALLY be able to handle all timeouts you need to do it from a separate process - because as you rightfully explained - trying to handle things "in-process" is not always applicable. The idea of mine is to add extra layer of "what to do if the actual task process is not responding" - and I think utilising that parent process (which is already there) to apply such hard-timeout is simplest - without modifying states and adding yet another layer of monitoring processes/overloading the scheduler.

I think - other than occasional "whole machine stops working" this would handle most cases where the task is not timing out but still continues to do stuff because of badly written low-level C implemetnation of the library that is used..

And the "whole machine hangs" case should anyhow be handled on deployment level (for example K8S should kill it, also in this case we will stop receiving heartbeats and ultimately Scheduler should handle it even today.

Taragolis commented 10 months ago

Upon cursory examination I could see only one side effect or benefit depend on the case. Right now task's email notifications and callbacks executed into the same process as task. So it could be the situation that task executed successfully and run on_success callbacks which take some additional time, and on this moment start_time + execution_timeout < current_time happen and we kill the process.

That is why I suggest also add grace period before kill, maybe even configurable

potiuk commented 10 months ago

That is why I suggest also add grace period before kill, maybe even configurable

Oh absolutely. the signal "dance" I was mentioning should involves several grace periods. Usually it is implemented by

wait a little a) send HUP (wait) b) if not stopped send TERM (wait a little more) c) if it does not work send KILL (and do not wait - there is nothing to wait for - It happend few times in my life that process did not die after SIGKILL and that was at times when the whole OS/installation got heavily broken).

Taragolis commented 10 months ago

Seems we know about the nature and have a plan how to resolve it, so let me pick this issue then.

Taragolis commented 10 months ago

It happend few times in my life that process did not die after SIGKILL and that was at times when the whole OS/installation got heavily broken)

giphy

xmariachi commented 10 months ago

Thank you guys. I don't have enough knowledge on Airflow internals to chip in much, but your solution sounds sensible.

simonprydden commented 10 months ago

@xmariachi Hi, I also use mwaa and have come across this behaviour. What is the status of the task when it gets stuck? We had the task in a queued state, and noticed the sqs age in the metrics started to climb. I believe it's an issue with down scaling on mwaa, we added dag_timeout to our minutely dag to stop after 5 mins.

xmariachi commented 10 months ago

@xmariachi Hi, I also use mwaa and have come across this behaviour. What is the status of the task when it gets stuck? We had the task in a queued state, and noticed the sqs age in the metrics started to climb. I believe it's an issue with down scaling on mwaa, we added dag_timeout to our minutely dag to stop after 5 mins.

In my case, the task is active and remains as such.

kurtqq commented 7 months ago

@Taragolis was wondering if you have a patch/workaround for this issue?

potiuk commented 7 months ago

@Taragolis was wondering if you have a patch/workaround for this issue?

Airflow 2.8.2 (RC likely tomorrow) - should have the fix applied from #35653 - if your problem @kurtqq was with swallowing exception. You are absolutely encouraged to subscribe to the devlist and when we send the email to vote on it to test if it solves your problem and report back.

If that was the case BTW (swallowing the Exception), then the workaround (or rather proper way of writing your DAG code or writing the library that caused it) was to remove the try: except Exception that caused it (you will need to find it by inspecting your code and understanding what your operators and tasks that time out are doing).

If there is other reason (like badly written C-Code in your library that does not handle signals propagated from Python) - then the solution is to fix the library (or maybe upgrade to newer version that is fixed or raising an issue to whoever creates the library - but only you - knowing your DAGs and task that do not react to timeout properly do.

That woudl be about all

ikholopov-omni commented 2 months ago

Do we have a strong case not too implement signal sequence in https://github.com/apache/airflow/issues/35474#issuecomment-1801988073 in supervising process? This seems to be generic enough and the only solution to deal with processes stuck in D-state.

If we just don't have it as priority, I can create a draft PR of that, but I don't want to start if there is a strong case of not implementing it. @potiuk

potiuk commented 1 month ago

No. There is nothing against it.