apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.07k stars 14.3k forks source link

KubernetesPodOperator with GCS FUSE sidecar sometimes stays running indefinitely #39625

Open aptenodytes-forsteri opened 6 months ago

aptenodytes-forsteri commented 6 months ago

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

No response

Apache Airflow version

airflow-2.7.3

Operating System

linux

Deployment

Google Cloud Composer

Deployment details

No response

What happened

I created a dag with a KubernetesPodOperator which uses the annotations to mount GCS buckets using the Google Cloud Storage FUSE Container Storage Interface (CSI) Plugin.

Due to what I believe to be another bug, https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/issues/257, the GCS FUSE sidecar would sometimes stay running.

From log messages, I determined the operator was stuck in an infinite loop here: https://github.com/apache/airflow/blob/029cbaec174b73370e7c4ef2d7ec76e7be333400/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L623

This appears to be a somewhat known issue, as the above function seems to have a special case for the istio sidecar. The same treatment should hold for all sidecars.

What you think should happen instead

It should not be possible for a "rogue" sidecar container to cause the KubernetesPodOperator to end up in an infinite loop. This behavior ends up hogging resources on a cluster and eventually clogs up the whole cluster with zombie pods.

One possible fix would be an optional timeout for how long to wait for the pod to do its work.

Another possible fix would be to generalize the treatment for the istio sidecar for all types of sidecars.

How to reproduce

  1. Set up a Google cloud composer environment
  2. Create a dag that mounts two buckets ("in" and "out" using the GCS FUSE csi driver)
import json
import time
from datetime import datetime, timedelta
from functools import cached_property

from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_manager import (PodManager, PodPhase,
                                                                container_is_completed)
from airflow.utils.dates import days_ago
from postgres import Postgres

from kubernetes import client
from kubernetes.client import models as k8s
from kubernetes.client.models.v1_pod import V1Pod

# Volume mounts for the 'in' and 'out' GCS bucket.
VOLUMES = [
    client.V1Volume(
        name="gcs-fuse-in",
        csi=client.V1CSIVolumeSource(
            driver="gcsfuse.csi.storage.gke.io",
            volume_attributes={
                "bucketName": "in-data",
                "mountOptions": "implicit-dirs",
                "fileCacheCapacity": "0"
            },
        ),
    )
]
VOLUME_MOUNTS = [client.V1VolumeMount(name="gcs-fuse-in", mount_path="/data")]

out_bucket = "out-data"
if out_bucket != "":
    VOLUMES.append(
        client.V1Volume(
            name="gcs-fuse-out", csi=client.V1CSIVolumeSource(
                driver="gcsfuse.csi.storage.gke.io", volume_attributes={
                    "bucketName": out_bucket,
                    "mountOptions": "implicit-dirs",
                    "fileCacheCapacity": "0"
                })))

    VOLUME_MOUNTS.append(
        client.V1VolumeMount(name="gcs-fuse-out", mount_path="/out"))

@dag(
    dag_id="example_dag",
    description="Attempts to reproduce hanging pod issue.",
    default_args={
        'owner': 'cloud_engineering',
        'email_on_failure': False, 
        'retries': 1,
        'retry_delay': timedelta(seconds=10),
    },
    start_date=START_DATE,
    end_date=END_DATE,
    schedule_interval=timedelta(seconds=30),
    catchup=True,
)
def dag():
    log_ids = fetch_new_log_ids()

    container_entry_point_commands = generate_entry_point_commands(log_ids)

    KubernetesPodOperator.partial(
        name="process_log_example_dag",
        namespace="composer-user-workloads",
        config_file="/home/airflow/composer_kube_config",
       kubernetes_conn_id="kubernetes_default",
        task_id="process_log",
        image="us-central1-docker.pkg.dev/project/images/example-image",
        get_logs=True,
        log_events_on_failure=True,
        do_xcom_push=False,
        volumes=VOLUMES,
        volume_mounts=VOLUME_MOUNTS,
        # GCS Fuse CSI driver relies on pod annotations to configure itself and the container sidecar it runs in.
        annotations={
            "gke-gcsfuse/volumes": "true",
            "gke-gcsfuse/ephemeral-storage-limit": "1Gi",
            "gke-gcsfuse/cpu-request": "500m",
            "gke-gcsfuse/memory-request": "1Gi",
            "gke-gcsfuse/ephemeral-storage-request": "1Gi",
        },
        container_resources=k8s.V1ResourceRequirements(
            limits={
                'memory': "3Gi",
                'cpu': "1",
                'ephemeral-storage': "0Gi"
            },
            requests={
                'memory': "3Gi",
                'cpu': "1",
                'ephemeral-storage': "0Gi"
            },
        ),
    ).expand(cmds=container_entry_point_commands)

dag()

The container runs a simple python script that writes 100 files to /out

def main():
  for i in range(0,100):
    with open(f"/out/{i}.txt", "w", encoding="utf8") as f:
      f.write(f"{i}")
  raise Exception("oops")

if __name__ == "__main__":
  main()

Anything else

I can work around the issue with:

class MyPodManager(PodManager):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def await_pod_completion(self, pod: V1Pod, istio_enabled: bool = False,
                             container_name: str = "base") -> V1Pod:
         while True and time.time() - start < 7200:
            remote_pod = self.read_pod(pod)
            if remote_pod.status.phase in PodPhase.terminal_states:
                break
            if istio_enabled and container_is_completed(remote_pod, container_name):
                break
            if container_is_completed(remote_pod, container_name):
                # Always break if the container is completed, even if sidecar containers continue to stay up
                # and keep the pod up.
                self.log.info("Pod %s should terminate now", pod.metadata.name)
                break
            self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)
            time.sleep(2)
        return remote_pod

class MyKubernetesPodOperator(KubernetesPodOperator):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @cached_property
    def pod_manager(self) -> PodManager:
        return MyPodManager(kube_client=self.client, callbacks=self.callbacks,                 progress_callback=self._progress_callback)

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

ryaminal commented 5 months ago

Thanks for sharing this @aptenodytes-forsteri. I was asking on the google issuetracker about how to do this to simulate what is available on a GCP Composer cluster e.g. /home/airflow/gcs/data. Someone on that issue mentioned they got it to work. Then I did. And, as I was going about looking to see if I should post a PR/issue here I stumbled upon this issue.

It's not the exact same but I imagine more and more folks want to do this and will want to mount multiple buckets/volumes.