apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.3k stars 14.09k forks source link

XCOM_sidecar_container not started results in long running DAG #38115

Open ionescur2 opened 6 months ago

ionescur2 commented 6 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

Error: Xcom_sidecar_container sometimes is not started which will result in a long running dag waiting for the container to start but it will never start again.

There is a small change in the following method: _await_xcom_sidecar_containerstart from _/airflow/providers/cncf/kubernetes/utils/podmanager.py.

Version 2.6.0 code:

    def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
        self.log.info("Checking if xcom sidecar container is started.")
        warned = False
        while True:
            if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
                self.log.info("The xcom sidecar container is started.")
                break
            if not warned:
                self.log.warning("The xcom sidecar container is not yet started.")
                warned = True
            time.sleep(1)

Version 2.7.2 code:

    def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
        self.log.info("Checking if xcom sidecar container is started.")
        for attempt in itertools.count():
            if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
                self.log.info("The xcom sidecar container is started.")
                break
            if not attempt:
                self.log.warning("The xcom sidecar container is not yet started.")
            time.sleep(1)

We believe that the old version code with the while true: inside is better suited because it checks if the container is started.

What you think should happen instead?

    def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
        self.log.info("Checking if xcom sidecar container is started.")
        warned = False
        while True:
            if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
                self.log.info("The xcom sidecar container is started.")
                break
            if not warned:
                self.log.warning("The xcom sidecar container is not yet started.")
                warned = True
            time.sleep(1)

We believe that this code will wait for the the container to start.

How to reproduce

Sometimes the XCOM container is not started. Stop the container and see the reaction of the DAG.

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

ionescur2 commented 6 months ago

Last log: [2024-03-01 13:31:43,814] The xcom sidecar container is not yet started.

hussein-awala commented 6 months ago

Technically, there is no difference between the code of the two versions, where tertools.count() is an infinite generator for incremental numbers. IMHO we need to add a timeout for the waiting time, similar to what we do for the base container, and when we reach this timeout, we should fail the task instead of waiting forever.

jhongy1994 commented 1 month ago

I created PR #40909 to fix this issue. Please check it!