apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Problem with pushing Xcom when using SparkKubernetesOperator #39184

Open truonglac2603 opened 6 months ago

truonglac2603 commented 6 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

I'm having an issue when using SparkKubernetesOperator to execute Spark job. Whenever i set do_xcom_push to True, the driver pod is created and run perfectly, but the xcom sidecar pod is nowhere to be found. Airflow log is thus stuck with this message:

[2024-04-23, 02:47:38 UTC] {custom_object_launcher.py:301} WARNING - Spark job submitted but not yet started. job_id: spark-custome-task-5r3pt2vk [2024-04-23, 02:47:48 UTC] {pod_manager.py:529} ERROR - container base whose logs were requested not found in the pod spark-custome-task-5r3pt2vk-driver [2024-04-23, 02:47:48 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-04-23, 02:47:48 UTC] {pod_manager.py:724} WARNING - The xcom sidecar container is not yet started. [2024-04-23, 02:51:24 UTC] {local_task_job_runner.py:296} WARNING - DagRun timed out after 0:05:03.242442. [2024-04-23, 02:51:29 UTC] {local_task_job_runner.py:296} WARNING - DagRun timed out after 0:05:08.319418. [2024-04-23, 02:51:29 UTC] {local_task_job_runner.py:302} WARNING - State of this instance has been externally set to skipped. Terminating instance. [2024-04-23, 02:51:29 UTC] {process_utils.py:131} INFO - Sending 15 to group 30. PIDs of all processes in the group: [30] [2024-04-23, 02:51:29 UTC] {process_utils.py:86} INFO - Sending the signal 15 to group 30 [2024-04-23, 02:51:29 UTC] {taskinstance.py:2483} ERROR - Received SIGTERM. Terminating subprocesses. [2024-04-23, 02:51:29 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=30, status='terminated', exitcode=0, started='02:46:43') (30) terminated with exit code 0

In my opinion, there should be something wrong with xcom push for this operator. Any help would be much appreciated. Thanks in advance.

What you think should happen instead?

xcom pushed perfectly with any case of spark, maybe there will be placeholder for spark job with no return value or whatsoever

How to reproduce

submit a basic spark application by using SparkKubernetesOperator to K8s cluster and set do_xcom_push to True

Operating System

Ubuntu 22.04.4 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.0.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

truonglac2603 commented 6 months ago

Here's logs when i run a DBT project with KubernetesPodOperator.

[2024-04-24, 08:53:35 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-04-24, 08:53:35 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started. [2024-04-24, 08:53:35 UTC] {pod_manager.py:798} INFO - Running command... if [ -s //xcom/return.json ]; then cat //xcom/return.json; else echo __***_xcom_result_empty__; fi [2024-04-24, 08:53:35 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1 [2024-04-24, 08:53:36 UTC] {pod.py:559} INFO - xcom result file is empty.

Seems like it handle empty Xcom like a charm. Should SparkKubernetesOperator work the same as this one?

jojovem commented 5 months ago

Im having the same issue as @truonglac2603 The task in airflow runs forever and i also get:

[2024-05-28, 15:50:40 UTC] {pod_manager.py:725} INFO - Checking if xcom sidecar container is started. [2024-05-28, 15:50:40 UTC] {pod_manager.py:731} WARNING - The xcom sidecar container is not yet started.

Airflow: 2.9.1 Python: 3.11 apache-airflow-providers-cncf-kubernetes: 8.2.0

The job is also running ok, but airflow doesnt recognize its end and it keeps executing even after SparkApplication is in the COMPLETED state.

This is my template:

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "{{ params.spark_app.app_name }}"
  namespace: "{{ params.spark_app.namespace }}"
spec:
  sparkConf:
    {% for key, value in params.spark_app.spark_conf.items() %}
      {{ key }}: "{{ value }}"
      {% endfor %}
  hadoopConf:
    {% for key, value in params.spark_app.hadoop_conf.items() %}
      {{ key }}: "{{ value }}"
      {% endfor %}
      fs.s3a.access.key: "{{ var.value.get('mm_prd_s3_access_key') }}"
      fs.s3a.secret.key: "{{ var.value.get('mm_prd_s3_secret_key') }}"
  mode: cluster
  image: "{{ params.spark_app.image }}"
  imagePullPolicy: Always
  imagePullSecrets:
    - "{{ params.spark_app.image_pull_secrets }}"
  mainApplicationFile: "local:///tmp/git-sync/{{ params.spark_app.git_sync.link }}/{{ params.spark_app.git_sync.jobs_path }}/{{ params.job_file }}"
  arguments: {{ params.spark_app.arguments | tojson }}
  sparkVersion: "{{ params.spark_app.spark_version }}"
  type: "{{ params.spark_app.type }}"
  pythonVersion: "{{ params.spark_app.python_version }}"
  timeToLiveSeconds: {{ params.spark_app.ttl }}
  restartPolicy:
    type: "{{ params.spark_app.restart_policy }}"
  volumes:
    - name: git-sync
      emptyDir: { }
    - name: spark-configmap
      configMap:
        name: spark-configmap
    - name: keystore-configmap
      configMap:
        name: keystore-configmap
  driver:
    cores: {{ params.spark_app.driver.cores }}
    coreLimit: "{{ params.spark_app.driver.core_limit }}"
    coreRequest: "{{ params.spark_app.driver.core_request }}"
    memory: "{{ params.spark_app.driver.memory }}"
    javaOptions: "-Dlog4j.configurationFile=/opt/spark/configmap/log4j2.properties"
    affinity:
      nodeAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
            - matchExpressions:
                - key: "maxmilhas/node-group-name"
                  operator: In
                  values:
                    - "{{ params.spark_app.node }}-driver"
    tolerations:
      - key: "{{ params.spark_app.node }}-driver"
        operator: "Equal"
        value: "true"
        effect: "NoSchedule"
    volumeMounts:
      - name: git-sync
        mountPath: /tmp/git-sync
      - name: spark-configmap
        mountPath: /opt/spark/configmap
        readOnly: true
      - name: keystore-configmap
        mountPath: /opt/keystore
        readOnly: true
    labels:
      version: "{{ params.spark_app.spark_version }}"
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "app-logs"
    serviceAccount: "{{ params.spark_app.service_account }}"
    envVars:
      PYTHONPATH: "/tmp/git-sync/{{ params.spark_app.git_sync.link }}/{{ params.spark_app.git_sync.jobs_path }}"
      mm_prd_cluster_user: "{{ var.value.get('mm_prd_cluster_user') }}"
      mm_prd_cluster_password: "{{ var.value.get('mm_prd_cluster_password') }}"
    initContainers:
      - name: git-sync
        image: "registry.k8s.io/git-sync/git-sync:{{ params.spark_app.git_sync.version }}"
        imagePullPolicy: IfNotPresent
        volumeMounts:
          - name: git-sync
            mountPath: /tmp/git-sync
        env:
          - name: GITSYNC_REPO
            value: "{{ params.spark_app.git_sync.repo }}"
          - name: GITSYNC_REF
            value: "{{ params.spark_app.git_sync.branch }}"
          - name: GITSYNC_ROOT
            value: "/tmp/git-sync"
          - name: GITSYNC_LINK
            value: "{{ params.spark_app.git_sync.link }}"
          - name: GITSYNC_ONE_TIME
            value: "{{ params.spark_app.git_sync.one_time }}"
          - name: GITSYNC_USERNAME
            value: "{{ var.value.get('gitsync_username') }}"
          - name: GITSYNC_PASSWORD
            value: "{{ var.value.get('gitsync_password') }}"
  executor:
    cores: {{ params.spark_app.executor.cores }}
    coreRequest: "{{ params.spark_app.executor.core_request }}"
    coreLimit: "{{ params.spark_app.executor.core_limit }}"
    instances: {{ params.spark_app.executor.instances }}
    memory: "{{ params.spark_app.executor.memory }}"
    deleteOnTermination: {{ params.spark_app.executor.delete_on_termination }}
    affinity:
      nodeAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
            - matchExpressions:
                - key: "maxmilhas/node-group-name"
                  operator: In
                  values:
                    - "{{ params.spark_app.node }}-worker"
    tolerations:
      - key: "{{ params.spark_app.node }}-worker"
        operator: "Equal"
        value: "true"
        effect: "NoSchedule"
    labels:
      version: "{{ params.spark_app.spark_version }}"
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "app-logs"
    volumeMounts:
      - name: spark-configmap
        mountPath: /opt/spark/configmap
        readOnly: true
      - name: keystore-configmap
        mountPath: /opt/keystore
        readOnly: true
bangnh1 commented 5 months ago

I'm having the same issue. Is there any way to solve the issue?

gustavo-maxmilhas commented 5 months ago

@bangnh1 Im my case, i downgraded the apache-airflow-providers-cncf-kubernetes to 7.13.0 to make it work

I didnt test with 8.3.0 yet

bangnh1 commented 5 months ago

Thanks @gustavo-maxmilhas. I also need KubernetesJobOperator, so it seems more difficult to me.

realvz commented 4 months ago

I have the same problem on 2.9.2. The driver pod completes but the worker pod keeps running. Disabling xcom push gets rid of this problem.

creyer commented 4 months ago

If you need to use xcom push, just make the init container yourself in the spark operator template (based on https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py)

  volumes:
      - name: xcom
        emptyDir: {}  
  driver:
    volumeMounts:
      - name: xcom
        mountPath: /airflow/xcom
    sidecars:
      - name: airflow-xcom-sidecar
        image: alpine
        command: ["sh", "-c", 'trap "exit 0" INT; while true; do sleep 1; done;']
        volumeMounts:
          - name: xcom
            mountPath: /airflow/xcom
        resources:
          requests:
            cpu: "1m"
            memory: "10Mi"
bangnh1 commented 4 months ago

If you need to use xcom push, just make the init container yourself in the spark operator template (based on https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py)

  volumes:
      - name: xcom
        emptyDir: {}  
  driver:
    volumeMounts:
      - name: xcom
        mountPath: /airflow/xcom
    sidecars:
      - name: airflow-xcom-sidecar
        image: alpine
        command: ["sh", "-c", 'trap "exit 0" INT; while true; do sleep 1; done;']
        volumeMounts:
          - name: xcom
            mountPath: /airflow/xcom
        resources:
          requests:
            cpu: "1m"
            memory: "10Mi"

It works like a charm. Thank you.

danielvincenzi commented 2 weeks ago

This worked perfectly for me too, thanks!

qRoC commented 1 day ago

@creyer thanks!