PrefectHQ / prefect-ray

Prefect integrations with Ray
https://prefecthq.github.io/prefect-ray/
Apache License 2.0
63 stars 5 forks source link

Subflows with RayTaskManager #32

Closed robbert-harms closed 2 years ago

robbert-harms commented 2 years ago

Prefect 2.0 supports subflows which is a great new addition. There is a bug when using nested flows with the same RayTaskManager(), in essence it crashes complaining about calling ray.init() twice.

It would be nice if this workcase would be supported since it would allow combining different parallel flows.

To reproduce, use:

from prefect import task, flow
from prefect_ray import RayTaskRunner

@task
def some_task(input):
    return input

@flow(task_runner=RayTaskRunner())
def my_subflow():
    some_task.submit(1)
    some_task.submit(2)
    some_task.submit(3)

@flow(task_runner=RayTaskRunner())
def some_flow():
    my_subflow()
    some_task.submit(4)

some_flow()

The error is:

09:15:38.789 | INFO    | prefect.engine - Created flow run 'terrestrial-bird' for flow 'some-flow'
09:15:38.789 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-08-17 09:15:40,455 INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265
09:15:42.168 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
09:15:42.168 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
09:15:42.355 | INFO    | Flow run 'terrestrial-bird' - Created subflow run 'small-marten' for flow 'my-subflow'
09:15:42.356 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
09:15:42.357 | ERROR   | Flow run 'small-marten' - Crash detected! Execution was interrupted by an unexpected exception.
09:15:42.390 | ERROR   | Flow run 'terrestrial-bird' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
09:15:45.181 | ERROR   | Flow run 'terrestrial-bird' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 29, in <module>
    some_flow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

I solved the problem in my private package by using a modified RayTaskRunner. In particular, I modified the _start_function to start with:

if self.address:
    self.logger.info(
        f"Connecting to an existing Ray instance at {self.address}"
    )
    init_args = (self.address,)
elif not ray.is_initialized():
    self.logger.info("Creating a local Ray instance")
    init_args = ()

Note the ray.is_initialized call.

ahuang11 commented 2 years ago

Thanks for reporting this! Would you like to try making a PR to fix this?

robbert-harms commented 2 years ago

Maybe in the future, currently very busy trying to get work done. The problem is simple enough to just fix it next time you dive into the code, no?

ahuang11 commented 2 years ago

Yep, just wanted to see if you were interested first. Thanks again for the report!

robbert-harms commented 2 years ago

Of course, happy to help. Many thanks for making this package in the first place.