PrefectHQ / prefect-ray

Prefect integrations with Ray
https://prefecthq.github.io/prefect-ray/
Apache License 2.0
63 stars 5 forks source link

Flow crashes after multiple hanged tasks with "Prefect Task run TASK_RUN_ID already finished" #58

Closed vholmer closed 1 year ago

vholmer commented 1 year ago

Prefect Version

Version: 2.6.7 API version: 0.8.3 Python version: 3.8.9 Git commit: 9074c158 Built: Thu, Nov 10, 2022 1:07 PM OS/Arch: linux/x86_64 Profile: cloud Server type: cloud

Prefect Ray Version

0.2.1

Ray Version

2.1.0

Describe the current behavior

I'm running a Ray cluster in azure (cluster yaml attached below). I created a small flow, using the Azure Container Instance Job infrastructure block, that looks like this:

@task
async def my_task(id: int):
    logger = get_run_logger()
    logger.info(f"Hello from my_task #{id}! Sleeping for 30 seconds...")
    time.sleep(30)
    logger.info(f"my_task #{id} done!")

@flow(
    name=FLOW_NAME,
    task_runner=RayTaskRunner(address="ray://10.0.0.4:10001")
)
def my_flow(loglevel: str = "DEBUG"):
    logconfig = setup_logging.submit(loglevel=loglevel)
    logger = get_run_logger()
    logger.info("Starting")
    # Spam ray tasks here!
    for i in range(50):
        my_task.submit(i + 1, wait_for=[logconfig])
        time.sleep(5)

After running this flow for about 5 minutes, I'm starting to see a lot of tasks that are stuck as 'running' indefinitely. Furthermore, these tasks are printing the following message to the log:

Hello from my_task #42! Sleeping for 30 seconds...                    05:15:10 PM
Task run 'b4a5a5f5-1d72-4146-9476-cf157e686df3' already finished.     05:15:39 PM

After seeing a handful of tasks exhibiting this behavior, the flow promptly crashes with no further error message. The agent logs the following:

16:11:52.674 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Running command...                   
16:11:52.676 | INFO    | prefect.agent - Completed submission of flow run '87eba3cb-f09f-4b21-90f5-e0d7923801e4'                                          
16:16:55.294 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Completed command run.               
16:16:55.296 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Deleting container...    

The ray cluster doesn't log anything of note, that I can tell. I'm not entirely sure what's happening here, but as per this discussion: https://discourse.prefect.io/t/edgecase-error-prefect-task-run-task-run-id-already-finished/1836 I'm not the only person affected by this.

Describe the proposed behavior

Of course the tasks shouldn't hang with "task already finished", they should either exit properly and crash, or continue running properly.

Ray Dockerfile ### Ray Dockerfile ```Dockerfile FROM rayproject/ray:2.1.0-py38-cpu # Don't buffer console output ENV PYTHONUNBUFFERED=1 # Don't compile bytecode, we're only running once ENV PYTHONDONTWRITEBYTECODE=1 ENV CODE_PATH=/home/ray/anaconda3/lib/python3.8/site-packages/ COPY ./pipeline/requirements.txt ${CODE_PATH}/pipeline/requirements.txt RUN pip install pip --upgrade && \ pip install -r ${CODE_PATH}/pipeline/requirements.txt COPY ./logging.yml ${CODE_PATH}/logging.yml COPY ./flow_audit ${CODE_PATH}/flow_audit COPY ./orchestration ${CODE_PATH}/orchestration COPY ./pipeline ${CODE_PATH}/pipeline ```
Ray Cluster YAML ### Ray Cluster YAML Please ignore the comments here, they're mostly from the default cluster YAML. We run a custom docker image as seen below from censoredregistry.azurecr.io/ray-cluster:latest (not the actual URL). The Dockerfile is attached above. ```YAML # An unique identifier for the head node and workers of this cluster. cluster_name: test # The maximum number of workers nodes to launch in addition to the head # node. max_workers: 25 # The autoscaler will scale up the cluster faster with higher upscaling speed. # E.g., if the task requires adding more nodes then autoscaler will gradually # scale up the cluster in chunks of upscaling_speed*currently_running_nodes. # This number should be > 0. upscaling_speed: 100.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty object means disabled. docker: # image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup image: censoredregistry.azurecr.io/ray-cluster:latest # use this one if you don't need ML dependencies, it's faster to pull container_name: "ray_container" # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image # if no cached version is present. pull_before_run: True run_options: # Extra options to pass into "docker run" - --ulimit nofile=65536:65536 # Example of running a GPU head with CPU workers # head_image: "rayproject/ray-ml:latest-gpu" # Allow Ray to automatically detect GPUs # worker_image: "rayproject/ray-ml:latest-cpu" # worker_run_options: [] # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 30 # TODO: Should probably be something like 5 minutes # Cloud-provider specific configuration. provider: type: azure # https://azure.microsoft.com/en-us/global-infrastructure/locations location: westeurope cache_stopped_nodes: False resource_group: rg-ray-test # Below settings only applicable for AWS, not Azure. # security_group: # GroupName: ray-nsg # IpPermissions: # - FromPort: 10001 # ToPort: 10001 # IpProtocol: TCP # IpRanges: # # This will enable inbound access from ALL IPv4 addresses. # - CidrIp: 194.218.21.196 # set subscription id otherwise the default from az cli will be used subscription_id: CENSORED_SUBSCRIPTION_ID # set unique subnet mask or a random mask will be used subnet_mask: 10.0.0.0/24 # set unique id for resources in this cluster # if not set a default id will be generated based on the resource group and cluster name unique_id: test # How Ray will authenticate with newly launched nodes. auth: ssh_user: ray # you must specify paths to matching private and public key pair files # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair ssh_private_key: ~/.ssh/ray-test # changes to this should match what is specified in file_mounts ssh_public_key: ~/.ssh/ray-test.pub # More specific customization to node configurations can be made using the ARM template azure-vm-template.json file # See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines # Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs # on the head node, so changes to the template must be included in the wheel file used in setup_commands section below # Tell the autoscaler the allowed node types and the resources they provide. # The key is the name of the node type, which is just for debugging purposes. # The node config specifies the launch config and physical instance type. available_node_types: ray.head.default: # The resources provided by this node type. resources: {"CPU": 0} # Provider-specific config, e.g. instance type. node_config: azure_arm_parameters: vmSize: Standard_D2s_v3 # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage imagePublisher: microsoft-dsvm imageOffer: ubuntu-1804 imageSku: 1804-gen2 imageVersion: latest ray.worker.default: # The minimum number of worker nodes of this type to launch. # This number should be >= 0. min_workers: 0 # The maximum number of worker nodes of this type to launch. # This takes precedence over min_workers. max_workers: 25 # The resources provided by this node type. resources: {"CPU": 2} # Provider-specific config, e.g. instance type. node_config: azure_arm_parameters: vmSize: Standard_D2s_v3 # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage imagePublisher: microsoft-dsvm imageOffer: ubuntu-1804 imageSku: 1804-gen2 imageVersion: latest # optionally set priority to use Spot instances # priority: Spot # set a maximum price for spot instances if desired # billingProfile: # maxPrice: -1 # Specify the node type of the head node (as configured above). head_node_type: ray.head.default # Files or directories to copy to the head and worker nodes. The format is a # dictionary from REMOTE_PATH: LOCAL_PATH, e.g. file_mounts: { # "/path1/on/remote/machine": "/path1/on/local/machine", # "/path2/on/remote/machine": "/path2/on/local/machine", "~/.ssh/ray-test.pub": "~/.ssh/ray-test.pub" } # Files or directories to copy from the head node to the worker nodes. The format is a # list of paths. The same path on the head node will be copied to the worker node. # This behavior is a subset of the file_mounts behavior. In the vast majority of cases # you should just use file_mounts. Only use this if you know what you're doing! cluster_synced_files: [] # Whether changes to directories in file_mounts or cluster_synced_files in the head node # should sync to the worker node continuously file_mounts_sync_continuously: False # Patterns for files to exclude when running rsync up or rsync down rsync_exclude: - "**/.git" - "**/.git/**" # Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for # in the source directory and recursively through all subdirectories. For example, if .gitignore is provided # as a value, the behavior will match git's behavior for finding and using .gitignore files. rsync_filter: - ".gitignore" # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. initialization_commands: # add /home/ray to path - export PATH=$PATH:/home/ray # enable docker setup - sudo usermod -aG docker $USER || true - sleep 10 # delay to avoid docker permission denied errors # get rid of annoying Ubuntu message - touch ~/.sudo_as_admin_successful # Wait for auto upgrade that might run in the background. - bash -c $'ps -e | grep apt | awk \'{print $1}\' | xargs tail -f --pid || true' # Log in to azure container registry (by ray-msi-test) - az login --identity - docker login censoredregistry.azurecr.io -u censoreduser -p censoredpassword # Exactly how the password is retrieved is censored. # List of shell commands to run to set up nodes. # NOTE: rayproject/ray-ml:latest has ray latest bundled setup_commands: [] # Note: if you're developing Ray, you probably want to create a Docker image that # has your Ray repo pre-cloned. Then, you can replace the pip installs # below with a git checkout (and possibly a recompile). # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line: # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl" # Custom commands that will be run on the head node after common setup. # NOTE: rayproject/ray-ml:latest has azure packages bundled head_setup_commands: [] # - pip install -U azure-cli-core==2.22.0 azure-mgmt-compute==14.0.0 azure-mgmt-msi==1.0.0 azure-mgmt-network==10.2.0 azure-mgmt-resource==13.0.0 # Custom commands that will be run on worker nodes after common setup. worker_setup_commands: [] # Command to start ray on the head node. You don't need to change this. head_start_ray_commands: - export AUTOSCALER_MAX_NUM_FAILURES=inf; - ray stop - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml # Command to start ray on worker nodes. You don't need to change this. worker_start_ray_commands: - ray stop - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 ```
zanieb commented 1 year ago

Thanks for the issue!

For this to be happening, I think Ray would need to be executing task orchestration multiple times. When we start orchestrating a task run, we check its state. If the task run is already finished we just returns the existing finished state. There's no place for it hang in the code there. Some debug level logs from the flow may be helpful? I'm a little stumped on this one though.

anna-geller commented 1 year ago

Great writeup @vholmer!

Michael gave a very good suggestion to add more logs to figure out when this is happening, perhaps you could run this?

@task
async def my_task(id: int):
    logger = get_run_logger()
    logger.info(f"Hello from my_task #{id}! Sleeping for 30 seconds...")
    time.sleep(30)
    logger.info(f"my_task #{id} done!")

@flow(
    name=FLOW_NAME,
    task_runner=RayTaskRunner(address="ray://10.0.0.4:10001")
)
def my_flow(loglevel: str = "DEBUG"):
    logconfig = setup_logging.submit(loglevel=loglevel)
    logger = get_run_logger()
    logger.info("Starting")
    # Spam ray tasks here!
    for i in range(1, 50):
        logger.debug("Starting a task run #%d", i)
        my_task.submit(i + 1, wait_for=[logconfig])
        logger.debug("Sleeping after task run submission for 5 sec")
        time.sleep(5)

Also, can you try the same with mapping instead of a for loop? Curious if this would result in a different behavior.

vholmer commented 1 year ago

Hi @anna-geller unfortunately I didn't manage to get any further useful information out of this. I ended up just switching from ray clusters to ACI jobs (non-prefect such) instead.

anna-geller commented 1 year ago

Thanks so much for the update, will ping our integrations team if they have an idea

ahuang11 commented 1 year ago

Based on the error in the Discourse page:

  File "/home/tobias/.cache/pypoetry/virtualenvs/ray-toby-tests-hzXs9SHv-py3.9/lib/python3.9/site-packages/prefect/engine.py", line 1254, in wait_for_task_runs_and_report_crashes
    if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'

We believe we may need to call await exception_to_crashed_state in prefect-ray here, like in prefect-dask.

So we are planning to first close out the PR in prefect-dask, then implement the same in prefect-ray.