apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.3k stars 14.09k forks source link

Intermittent json errors from xcom sidecar #39267

Open paramjeet01 opened 4 months ago

paramjeet01 commented 4 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

We are facing intermittent json error but on next retry it works.

[2024-04-26, 00:21:32 IST] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:36 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:40 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:44 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-04-26, 00:21:52 IST] {pod.py:909} INFO - Deleting pod: hps-mydata-generation-9lqygp1s
[2024-04-26, 00:21:52 IST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 200, in execute
    result = self.extract_xcom(pod=self.pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 557, in extract_xcom
    result = self.pod_manager.extract_xcom(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 730, in extract_xcom
    result = self.extract_xcom_json(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 765, in extract_xcom_json
    json.loads(result)
  File "/usr/local/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 16385 (char 16384)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 215, in execute
    self.cleanup(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
    raise AirflowException

What you think should happen instead?

The task should not fail when xcom data.

How to reproduce

This can be reproduced by having a 20k char in json file for xcom and it'll fail intermittently while taking the data. I'll investigate on the reason for the xcom json issue.

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

pytest>=6.2.5 docker>=5.0.0 crypto>=1.4.1 cryptography>=3.4.7 pyOpenSSL>=20.0.1 ndg-httpsclient>=0.5.1 boto3>=1.34.0 sqlalchemy redis>=3.5.3 requests>=2.26.0 pysftp>=0.2.9 werkzeug>=1.0.1 apache-airflow-providers-cncf-kubernetes==8.0.0 apache-airflow-providers-amazon>=8.13.0 psycopg2>=2.8.5 grpcio>=1.37.1 grpcio-tools>=1.37.1 protobuf>=3.15.8,<=3.21 python-dateutil>=2.8.2 jira>=3.1.1 confluent_kafka>=1.7.0 pyarrow>=10.0.1,<10.1.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Official helm chart deployment.

Anything else?

I think , we are facing similar issue : https://github.com/apache/airflow/issues/32111 And it's fixed here : https://github.com/apache/airflow/pull/32113/files , we might need to increase the retry count.

Are you willing to submit PR?

Code of Conduct

paramjeet01 commented 4 months ago

Also found this github thread, shall we implement this : https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981

paramjeet01 commented 4 months ago

@potiuk , Can you please guide us on this ? I'm not able to find a solution for this. This occurs intermittently and works on next retry. If you require further information on this , I'll be able collect the logs.

paramjeet01 commented 4 months ago

I can confirm that the issue is solved with the below code , we have added this as custom extract_xcom : This is also mentioned here : https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981 , We didn't have this issue in v2.3.3 , so I believe this PR could have caused the error : https://github.com/apache/airflow/pull/23490/files

    def extract_xcom_json(self, pod: V1Pod):
        try:
            self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')
            client = kubernetes_stream(
                self._client.connect_get_namespaced_pod_exec,
                pod.metadata.name,
                pod.metadata.namespace,
                container=PodDefaults.SIDECAR_CONTAINER_NAME,
                command=[
                    '/bin/sh',
                    '-c',
                    f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
                ],
                stderr=True,
                stdin=False,
                stdout=True,
                tty=False,
                _preload_content=False,
                _request_timeout=10,
            )
            client.run_forever(timeout=10)
            result = client.read_all()
            self.log.info("Received {} ({}) ({} ... {}))".format(type(result), len(result), result[:64], result[-64:]))

            # validate it's valid json
            _ = json.loads(result)

            # Terminate the sidecar
            _ = kubernetes_stream(
                self._client.connect_get_namespaced_pod_exec,
                pod.metadata.name,
                pod.metadata.namespace,
                container=PodDefaults.SIDECAR_CONTAINER_NAME,
                command=[
                    '/bin/sh',
                    '-c',
                    'kill -s SIGINT 1',
                ],
                stderr=True,
                stdin=False,
                stdout=True,
                tty=False,
                _preload_content=True,
                _request_timeout=10,
            )

            return result

        except json.JSONDecodeError:
            message = f'Failed to decode json document from pod: {pod.metadata.name}'
            self.log.exception(message)
            raise AirflowException(message)

        except Exception as e:
            message = f'Failed to extract xcom from pod: {pod.metadata.name}'
            self.log.exception(message)
            raise AirflowException(message)
eladkal commented 3 months ago

@paramjeet01 do you have a proposed fix in mind? Can you open a PR?

cleivson commented 3 months ago

Any news here? Is there any known workaround while a fix is in the way?

paramjeet01 commented 3 months ago

@eladkal yes , I'll create a PR with the above suggest code. I'm afraid that I can't find the root cause of the issue in current code.

paramjeet01 commented 3 months ago

@cleivson , you can customise your airflow to call the above mentioned method till we have a fix

cleivson commented 2 months ago

@paramjeet01 , thanks. I'm not really sure where to put this. Based on your SO "Amazon Linux 2", am I right to assume you're using MWAA? I'm using it and I was wondering if the bug could be related to the environment

paramjeet01 commented 2 months ago

No , I use community edition and I have customized the xcom code to solve this problem