LamaAni / KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
57 stars 8 forks source link

ConnectionResetError: [Errno 104] Connection reset by peer #86

Closed lqsantos closed 1 year ago

lqsantos commented 1 year ago

Describe the bug I getting this error in our dags running KubenernetesJobOperator

To Reproduce Steps to reproduce the behavior:

  1. Create an aiflow DAG with the following code:
    
    from airflow import DAG
    from airflow.operators.empty import EmptyOperator
    from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
    from airflow.utils.dates import days_ago

dag_name = 'logs-job-operator'

default_args = {"owner": "leandro", "start_date": days_ago(2), "retries": 0}

with DAG( dag_name, default_args=default_args, description='dag_anomaly', schedule_interval= None, start_date=days_ago(1), tags=['ml'], ) as dag:

start = EmptyOperator(
    task_id="start",
    dag=dag
)
complete = EmptyOperator(
    task_id="complete",
    dag=dag
)

manifest_job = {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
        "name": "job-operator-example",
        "namespace": "default"
    },
    "spec": {
        "completions": 10,
        "parallelism": 10,
        "backoffLimit": 10,
        "template": {
            "spec": {
                "nodeSelector": {
                    "agentpool": "userpool"
                },
                "containers": [
                    {
                        "name": "job-operator-example",
                        "image": "sikwan/random-json-logger:latest"
                    }
                ],
                "restartPolicy": "OnFailure"
            }
        }
    }
}

k8sJobOperator = KubernetesJobOperator(task_id="test-job-success", body=manifest_job, dag=dag)

start >> k8sJobOperator >> complete

2. Run the DAG
3. In our environment the error occurs after about 5 min of execution

**Expected behavior**
No error raise during job execution.

**Screenshots**
If applicable, add screenshots to help explain your problem.

**Environment**
Airflow is deployed on Azure Kubernetes Service with KubernetesExecutor that spawns the workers pod in the same AKS Clusters.
Kubernetes Version 1.24.9
Airflow Version 2.4.3
airflow_kubernetes_job_operator-2.0.12

**Log**

[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-q8rwm]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "DEBUG", "message": "first loop completed."} [2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-vvn6v]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "ERROR", "message": "something happened in this execution."} [2023-02-23, 14:40:56 UTC] {client.py:485} ERROR - Traceback (most recent call last):

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 443, in _error_catcher yield

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 815, in read_chunked self._update_chunk_length()

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 745, in _update_chunk_length line = self._fp.fp.readline()

File "/usr/local/lib/python3.7/socket.py", line 589, in readinto return self._sock.recv_into(b)

File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into return self.read(nbytes, buffer)

File "/usr/local/lib/python3.7/ssl.py", line 929, in read return self._sslobj.read(len, buffer)

ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _execute_query self.query_loop(client)

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 346, in query_loop raise ex

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 339, in query_loop return super().query_loop(client)

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 390, in query_loop raise ex

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 348, in query_loop for line in self._read_response_stream_lines(response):

File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 183, in _read_response_stream_lines for chunk in response.stream(decode_content=False):

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 623, in stream for line in self.read_chunked(amt, decode_content=decode_content):

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 844, in read_chunked self._original_response.close()

File "/usr/local/lib/python3.7/contextlib.py", line 130, in exit self.gen.throw(type, value, traceback)

File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 460, in _error_catcher raise ProtocolError("Connection broken: %r" % e, e)

urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))


Complete log:
[dag_id=logs-job-operator_run_id=manual__2023-02-23T14_31_39.933191+00_00_task_id=test-job-success_attempt=1.log](https://github.com/LamaAni/KubernetesJobOperator/files/10814868/dag_id.logs-job-operator_run_id.manual__2023-02-23T14_31_39.933191%2B00_00_task_id.test-job-success_attempt.1.log)
LamaAni commented 1 year ago

Hi, this looks like a kind of timeout.

Note that the restart policy should be never.

What is your system? Also, this seems like a timeout which is known to happen on Amazon kubernetes. See the other open conversation.

Can you run this locally with docker desktop kubernetes or some other local cluster? Can you try this with 1 completion? And 1 fails? Can you try running the example in the repo? Or the test in the repo?

I can only get to this mid next week. Apologies about that.

lqsantos commented 1 year ago

Hi, @LamaAni, Tks for reply...

I've changed the RestartPolicy to Never and the problem keeps happening. I've tried to run with 1 competitions and doesn't work either.

Sorry but I don't know what your meant about running the example in the repo... Sorry I'm new on this. Could you explain me how to do it? In meanwhile I'll setup and test localy as you suggested.

Tks

LamaAni commented 1 year ago

Can you please test the example here:

from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "tester",
    "start_date": days_ago(2),
    "retries": 0,
}

dag = DAG(
    "job-operator-simple-test",
    default_args=default_args,
    description="Test base job operator",
    schedule_interval=None,
)

KubernetesJobOperator(
    task_id="very-simple-job",
    dag=dag,
    image="ubuntu",
    command=[
        "bash",
        "-c",
        "echo start; sleep 5; echo end",
    ],
)

The above should make a very fast image run - check if the timeout is the issue. If that passes, just increase the sleep time in there and you would get a test of the timeout limit on the cluster.

I forced in the code the restart policy to Never - but the yaml should be correct anyways - this is since airflow should control the restarts. Completions should works, but in this case I would set that in airflow rather then the job... it would give you more proper logs.

The error sometimes comes from a timeout on the open connection with the Kubernets cluster (connection timeout forced by server) - hence testing locally may shed light on the issue.

lqsantos commented 1 year ago

I've just test your proposed dag and the task end successfully when the sleep value was 5 seconds. After increasing to 300 seconds, the task ends with Connection reset by peer. I think the issue is related to a know issue on AKS: https://learn.microsoft.com/en-us/answers/questions/467642/no-response-from-api-for-long-running-jobs-in-aci

Since this problem is critical to the delivery of a project. We had to create our own custom operator to submit Kubernetes jobs. During the creation and testing process we came across a similar problem when we tried to stream read the logs generated by Kubernetes using the official python library for k8s. We got around this problem by abandoning the log flow strategy and starting to make periodic requests.

LamaAni commented 1 year ago

Yea this issue was already mentioned here: https://github.com/LamaAni/KubernetesJobOperator/issues/54

I need to fix that but had not the time. If you can add a reconnect methodology then I will def accept a PR. If I get some time I'll fix that up but currently it is an open issue.

Feel free to close this issue when you are done.