astronomer / astro-provider-ray

This provider contains operators, decorators and triggers to send a ray job from an airflow task
https://astronomer.github.io/astro-provider-ray/
Apache License 2.0
10 stars 2 forks source link

[Bug] Ray task freezes #78

Open tatiana opened 4 days ago

tatiana commented 4 days ago

Context

When attempting to run the following DAG in Astro withing a GKE cluster, the task is freezing.

"""Example DAG using Astro Ray Provider."""

from airflow.decorators import dag, task
from ray_provider.decorators.ray import ray
from common.airflow_defaults import GLOBAL_DEFAULTS
from common.dag_name import DagName
from common.dag_tags import tag_strings_for_dag
from utils.ray.ray_helpers_new import get_ray_task_config

@dag(**GLOBAL_DEFAULTS, tags=tag_strings_for_dag(DagName.EXAMPLE_DAG_RAY_PROVIDER))
def example_dag_ray_provider():
    """Example using Ray Provider."""
    @task
    def generate_data():
        return [1, 2, 3]  # list(range(50))
    @ray.task(config=get_ray_task_config())
    def process_data_with_ray(data):
        import logging
        import numpy as np
        import ray
        @ray.remote
        def square(x):
            """Example function that will be run on a Ray worker."""
            # TODO (dukarc): Eventually remove this testing code
            # fill an array to fill up 10Gib of memory
            # large_array = [x] * 10_000_000
            # logging.info("Sleeping for 2 minutes")
            # logging.info(f"Size of large array is {len(large_array)}")
            # sleep(60)
            return x**2
        data = np.array(data)
        futures = [square.remote(x) for x in data]
        results = ray.get(futures)
        mean = np.mean(results)
        logging.info(f"Mean of this population is {mean}")
    data = generate_data()
    process_data_with_ray(data)
ray_example_dag = example_dag_ray_provider()

Issue

It seems that the Ray SubmitRayJob is doing tons of things on a block that's catching a generic Python Exception, and we are hiding the error because it logs the original exception only after it attempts to delete the cluster, which can fail by itself:

https://github.com/astronomer/astro-provider-ray/blob/a900d439d02e1b59f980d5a2275f70ef0a05be93/ray_provider/operators/ray.py#L264-L312

By swapping lines 311 and 312, we (and the customer) will be able to see the original problem.

tatiana commented 3 days ago

I released a new version of the Ray provider, swapping the lines and adding additional logs so we can understand what the original exception is: https://github.com/astronomer/astro-provider-ray/releases/tag/v0.3.0a6 https://pypi.org/project/astro-provider-ray/0.3.0a6/

tatiana commented 1 day ago

After upgrading the customer faced this issue in their Astro deployment:

example-dag-ray-provider-process-data-with-ray-p16as5bx
*** No logs found on s3 for ti=<TaskInstance: example_dag_ray_provider.process_data_with_ray manual__2024-10-10T03:05:51.997591+00:00 [running]>
*** Attempting to fetch logs from pod example-dag-ray-provider-process-data-with-ray-p16as5bx through kube API
*** Reading from k8s pod logs failed: ('Cannot find pod for ti %s', <TaskInstance: example_dag_ray_provider.process_data_with_ray manual__2024-10-10T03:05:51.997591+00:00 [running]>)
and this from scheduler logs
[2024-10-09T21:19:46.001-0700] {kubernetes_executor_utils.py:98} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py", line 85, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py", line 153, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 195, in stream
    raise client.rest.ApiException(
kubernetes.client.exceptions.ApiException: (410)
Reason: Expired: too old resource version: 332505703 (332546384)

Process KubernetesJobWatcher-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.11/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py", line 85, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/astronomer/kubernetes/executors/kubernetes_executor_utils.py", line 153, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 195, in stream
    raise client.rest.ApiException(
kubernetes.client.exceptions.ApiException: (410)
Reason: Expired: too old resource version: 332505703 (332546384)

The CRE team will continue the support in https://astronomer.zendesk.com/agent/tickets/65669/