LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
57 stars 8 forks source link

Delete the job due to an error triggered by chunk.decode("utf8"). #97

Open kuixiang opened 3 months ago

kuixiang commented 3 months ago

Phenomenon

When a user submits a job through Airflow, it runs for a while and then encounters the following error:

[2024-07-19, 12:46:11 CST] {taskinstance.py:1112} DEBUG - <TaskInstance: asset_map.job_identifier manual__2024-06-01T16:12:23+08:00 [running]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Streaming
[2024-07-19, 12:46:11 CST] {operations.py:67} INFO - [user-jobs/jobs/job_identifier] deleted
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Disconnected
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Job deleted
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Client stopped, execution aborted.
[2024-07-19, 12:46:11 CST] {taskinstance.py:1824} ERROR - Task failed with exception
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiException: Error while executing query
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/path/to/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 463, in execute
    rslt = self.job_runner.execute_job(
  File "/path/to/airflow_kubernetes_job_operator/job_runner.py", line 451, in execute_job
    raise ex
  File "/path/to/zthreading/tasks.py", line 176, in _run_as_thread
    rslt = self.action(*args, **kwargs)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 261, in _execute_query
    raise ex
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 257, in _execute_query
    self.query_loop(client)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 466, in query_loop
    raise ex
  File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 459, in query_loop
    return super().query_loop(client)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 446, in query_loop
    raise ex from KubeApiException("Error while executing query")
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 386, in query_loop
    for line in self._read_response_stream_lines(response):
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 212, in _read_response_stream_lines
    chunk = chunk.decode("utf8")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe6 in position 16375: unexpected end of data

The job gets deleted, and it's impossible to check the failure details later using commands like kubectl describe job or pod.

Cause of the Issue

The original code for retrieving the job context was:

Extracted in chunks and then reassembled. If the job context description contains Chinese characters, such as “开始” (start), they are encoded as \xe5\xbc\x80\xe5\xa7\x8b but might be truncated to \xe5\xbc\x80\xe5 and \xa7\x8b. The \xe5 part is the beginning of a Chinese character. Decoding \xe5\xbc\x80\xe5 as UTF-8 results in an error. Airflow handles the triggered exception by abruptly deleting the job.

    @classmethod
    def _read_response_stream_lines(cls, response: HTTPResponse):
        """INTERNAL. Helper yield method. Parses the streaming http response
        to lines (can be async!)

        Yields:
            str: The line
        """
        prev = ""
        for chunk in response.stream(decode_content=False):
            if isinstance(chunk, bytes):
                chunk = chunk.decode("utf8")
            chunk = prev + chunk
            lines = chunk.split("\n")
            if not chunk endswith("\n"):
                prev = lines[-1]
                lines = lines[:-1]
            else:
                prev = ""
            for line in lines:
                if line:
                    yield line

Solutions


# The idea is to catch the UnicodeDecodeError exception, check if it occurred at the end of the chunk, 
# indicating a truncation issue, store the truncated part, and parse only the untruncated part.
# Then, merge the truncated part with the next chunk for parsing.

@classmethod
def _read_response_stream_lines(cls, response: HTTPResponse):
    """INTERNAL. Helper yield method. Parses the streaming http response
    to lines (can be async!)

    Yields:
        str: The line
    """
    prev = ""
    prev_binary_chunk = b""
    for chunk in response.stream(decode_content=False):
        if isinstance(chunk, bytes):
            chunk = prev_binary_chunk + chunk
            prev_binary_chunk = b""
            try:
                chunk = chunk.decode("utf8")
            except UnicodeDecodeError as e:
                if e.end != len(chunk):
                    raise
                prev_binary_chunk = chunk[e.start:]
                chunk = chunk[0:e.start].decode("utf8")
        chunk = prev + chunk
        lines = chunk.split("\n")
        if not chunk endswith("\n"):
            prev = lines[-1]
            lines = lines[:-1]
        else:
            prev = ""
        for line in lines:
            if line:
                yield line
LamaAni commented 3 months ago

Hi, Nice catch!!

I'd defenitly accept a pr!

Looks like the solution you have works.! If I understand, the error happens due to missing bytes in the decode. e.g. (recived, 3 bytes out of 4 for a char).

One small thing though, please raise a proper error (with text, error type should be the KubernetesJobOperator error or some ParseError, see example in errors).

Your code, with the corrections,

        prev = ""
        prev_binary_chunk = b""
        for chunk in response.stream(decode_content=False):
            if isinstance(chunk, bytes):
                chunk = prev_binary_chunk + chunk
                prev_binary_chunk = b""
                try:
                    chunk = chunk.decode("utf8")
                except UnicodeDecodeError as e:
                    # This may happen for the case where
                    # we have split string chars that have more than one byte per char,
                    # (say, char has 4 bytes, but we received 3)

                    # This check needs eplaination as well (thnx!)
                    if e.end != len(chunk):
                        raise KubeApiException(
                            "Error when parsing api response stream"
                        ) from e
                    prev_binary_chunk = chunk[e.start :]
                    chunk = chunk[0 : e.start].decode("utf8")

            chunk = prev + chunk
            lines = chunk.split("\n")

            if not chunk.endswith("\n"):
                prev = lines[-1]
                lines = lines[:-1]
            else:
                prev = ""
            for line in lines:
                if line:
                    yield line