kubernetes-client / python

Official Python client library for kubernetes
http://kubernetes.io/
Apache License 2.0
6.74k stars 3.27k forks source link

[stream/ws_client] Reading long stdout is truncated at 32768 chars #2226

Open paramjeet01 opened 5 months ago

paramjeet01 commented 5 months ago

We are facing this issue in airflow and think it's due to python kubernetes-client. Earlier repo issue : https://github.com/kubernetes-client/python-base/issues/190 Airflow issue : https://github.com/apache/airflow/issues/39267 Error log :

[2024-04-26, 00:21:32 IST] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:36 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:40 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:44 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-04-26, 00:21:52 IST] {pod.py:909} INFO - Deleting pod: hps-mydata-generation-9lqygp1s
[2024-04-26, 00:21:52 IST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 200, in execute
    result = self.extract_xcom(pod=self.pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 557, in extract_xcom
    result = self.pod_manager.extract_xcom(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 730, in extract_xcom
    result = self.extract_xcom_json(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 765, in extract_xcom_json
    json.loads(result)
  File "/usr/local/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 16385 (char 16384)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 215, in execute
    self.cleanup(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
    raise AirflowException
roycaihw commented 5 months ago

/help

It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library

k8s-ci-robot commented 5 months ago

@roycaihw: This request has been marked as needing help from a contributor.

Guidelines

Please ensure that the issue body includes answers to the following questions:

For more details on the requirements of such an issue, please see here and ensure that they are met.

If this request no longer meets these requirements, the label can be removed by commenting with the /remove-help command.

In response to [this](https://github.com/kubernetes-client/python/issues/2226): >/help > >It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library Instructions for interacting with me using PR comments are available [here](https://git.k8s.io/community/contributors/guide/pull-requests.md). If you have questions or suggestions related to my behavior, please file an issue against the [kubernetes-sigs/prow](https://github.com/kubernetes-sigs/prow/issues/new?title=Prow%20issue:) repository.
paramjeet01 commented 5 months ago

I'm not sure where the data is lost in between but I'm trying this code as suggested by @paul424 in this issue : https://github.com/kubernetes-client/python-base/issues/190. My understanding is that the data is lost while reading from the cat command,

def _extract_xcom(self, pod: V1Pod):
    try:
        self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')

        # Can't use _preload_content=True because that would return str(json.load(response))
        client = kubernetes_stream(
            self._client.connect_get_namespaced_pod_exec,
            pod.metadata.name,
            pod.metadata.namespace,
            container=PodDefaults.SIDECAR_CONTAINER_NAME,
            command=[
                '/bin/sh',
                '-c',
                f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
            ],
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False,
            _preload_content=False,
            _request_timeout=10,
        )
        client.run_forever(timeout=10)
        resp = client.read_all()
        self.log.info("Received {} ({}) ({} ... {}))".format(type(resp), len(resp), resp[:64], resp[-64:]))

        # validate it's valid json
        _ = json.loads(resp)

        # Terminate the sidecar
        _ = kubernetes_stream(
            self._client.connect_get_namespaced_pod_exec,
            pod.metadata.name,
            pod.metadata.namespace,
            container=PodDefaults.SIDECAR_CONTAINER_NAME,
            command=[
                '/bin/sh',
                '-c',
                'kill -s SIGINT 1',
            ],
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False,
            _preload_content=True,
            _request_timeout=10,
        )

        return resp

    except JSONDecodeError:
        message = f'Failed to decode json document from pod: {pod.metadata.name}'
        self.log.exception(message)
        raise AirflowException(message)

    except Exception as e:
        message = f'Failed to extract xcom from pod: {pod.metadata.name}'
        self.log.exception(message)
        raise AirflowException(message)