apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

[GkeStartPodOperator] - Kubernetes client request can hand indefinitely #36802

Open IKholopov opened 10 months ago

IKholopov commented 10 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.12.0

Apache Airflow version

2.6.3

Operating System

Ubuntu 20.04.6

Deployment

Other

Deployment details

N/A

What happened

In a DAG with ~500 GkeStartPodOperator tasks (running pods on another cluster, hosted on GKE) we discovered that operator execution hangs polling logs in ~0.2% of the task instances. Based on logs, the execution halts in the call inside kubernetes client (read_namespaced_pod_log to be exact).

Only after the DAG run timeout (hours later), when SIGTERM is dispatched to the task run process, execution resumes, attempts to retry to fetch logs and pod status, but those have already been garbage collected.

This looks exactly like https://github.com/kubernetes-client/python/issues/1234#issuecomment-695801558. After running the same deployment in the deferred mode, 1 task also ended up being locked up in a similar way, this time for another call (for creation):

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 310, in run_pod_async
    resp = self._client.create_namespaced_pod(
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
    return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
    return [self.rest_client.POST](http://self.rest_client.post/)(url,
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
    return self.request("POST", url,
  File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 168, in request
    r = self.pool_manager.request(
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/request.py", line 81, in request
    return self.request_encode_body(
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/request.py", line 173, in request_encode_body
    return self.urlopen(method, url, **extra_kw)
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/poolmanager.py", line 376, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 467, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 462, in _make_request
    httplib_response = conn.getresponse()
  File "/opt/python3.8/lib/python3.8/http/client.py", line 1348, in getresponse
    response.begin()
  File "/opt/python3.8/lib/python3.8/http/client.py", line 316, in begin
    version, status, reason = self._read_status()
  File "/opt/python3.8/lib/python3.8/http/client.py", line 277, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/opt/python3.8/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
  File "/opt/python3.8/lib/python3.8/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/opt/python3.8/lib/python3.8/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1521, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

I believe this is specific to GkeStartPodOperator, as KubernetesHook does have the mechanism ensuring TCP keep alive is configured by default: https://github.com/apache/airflow/blob/1d5d5022b8fc92f23f9fdc3b61269e5c7acfaf39/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L216, but GKEPodHook does not: https://github.com/apache/airflow/blob/1d5d5022b8fc92f23f9fdc3b61269e5c7acfaf39/airflow/providers/google/cloud/hooks/kubernetes_engine.py#L390

What you think should happen instead

GKEPodHook should reuse the same socket configuration used in KubernetesHook and configure TCP Keepalive (unless disabled).

How to reproduce

Run ~500 tasks on GKE with spot VMs. There is no reliable repro, but the problem has been clearly documented before and fixed for CNCF-k8s provider: https://github.com/apache/airflow/pull/11406.

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 10 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

dirrao commented 10 months ago

@IKholopov Thanks for reporting the issue. Looks like its a bug. It requires further investigation.

MaksYermak commented 9 months ago

Hello Team! Now I am investigating this issue and then I will try to prepare a fix for this.