PrefectHQ / prefect-ray

Prefect integrations with Ray
https://prefecthq.github.io/prefect-ray/
Apache License 2.0
63 stars 5 forks source link

Failed to unpickle serialized exception when running on remote cluster #40

Closed tekumara closed 2 years ago

tekumara commented 2 years ago
09:59:53.784 | INFO    | prefect.engine - Created flow run 'glittering-bull' for flow 'greetings'
09:59:53.784 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
09:59:57.618 | INFO    | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
09:59:57.618 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.42.1.9:8265
09:59:57.890 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
09:59:59.292 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-0' for execution.
09:59:59.318 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
09:59:59.326 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-2' for execution.
09:59:59.409 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
09:59:59.421 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-1' for execution.
09:59:59.462 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
09:59:59.476 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
09:59:59.527 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
09:59:59.539 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
09:59:59.623 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
09:59:59.636 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
09:59:59.733 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
09:59:59.745 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
09:59:59.839 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
09:59:59.850 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-3' for execution.
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
10:00:05.415 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.443 | INFO    | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.469 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.500 | INFO    | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.528 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.554 | INFO    | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.579 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.607 | INFO    | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:06.767 | ERROR   | Flow run 'glittering-bull' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/compute/code/orion-demo/flows/ray_flow.py", line 49, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 238, in create_then_begin_flow_run
    return state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 157, in result
    state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 46, in from_ray_exception
    return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 2 required keyword-only arguments: 'request' and 'response'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
    obj = self._deserialize_object(data, metadata, object_ref)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 264, in _deserialize_object
    return RayError.from_bytes(obj)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 40, in from_bytes
    return RayError.from_ray_exception(ray_exception)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 49, in from_ray_exception
    raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception
from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
import sys

@task
def say_hello(name: str) -> None:
    print(f"say_hello {name}", file=sys.stderr)
    logger = get_run_logger()
    logger.info(f"hello {name}")

@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")

# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        address="ray://10.97.36.20:10001",
        init_kwargs={
            "runtime_env": {
                "pip": ["prefect==2.3.2"],
                "env_vars": {
                    "PREFECT_API_URL": "xxx",
                    "PREFECT_API_KEY": "yyy"
                }
            }
        },
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        # tasks must be submitted to run on ray
        # if called without .submit() they are still tracked but
        # run immediately and locally rather than async on ray
        say_hello.submit(name)
        say_goodbye.submit(name)

if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

prefect 2.3.2 ray 2.0.0

tekumara commented 2 years ago

This was occurring because I had not set PREFECT_API_URL in the process running the flow, and so the prefect API could not be reached.