PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

`RayTaskRunner` crashes with `pydantic>=2.0.0` #13005

Closed thatdevsherry closed 4 days ago

thatdevsherry commented 10 months ago

prefect-ray crashes when using pydantic>=2.0.0.

Expectation / Proposal

It is expected that RayTaskRunner can work with both pydantic v1 and pydantic v2. Especially now that prefect 2.13.4 is out and it also had to update itself to handle pydantic v2's newer schema.

Prefect 2.13.4 supports using pydantic v2. But if one wants to use prefect-ray with it, they can't use pydantic v2 and have to fallback to pydantic v1.

I'm not certain if this is to be fixed in prefect-ray or if this is a prefect issue. Guidance appreciated.

Traceback / Example

❯ pip freeze | grep -E 'pydantic|prefect'
prefect==2.14.3
prefect-dask==0.2.4
prefect-ray==0.3.0
prefect-shell==0.2.0
pydantic==2.4.2
pydantic_core==2.10.1
from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def _():
    pass

@flow(task_runner=RayTaskRunner())
def main():
    _.submit()

if __name__ == "__main__":
    main()

Crash log with pydantic==2.4.2

23:18:38.369 | DEBUG   | prefect.profiles - Using profile 'default'
23:18:39.286 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/msq/.prefect/prefect.db
23:18:39.346 | INFO    | prefect.engine - Created flow run 'uber-koala' for flow 'main'
23:18:39.346 | DEBUG   | Flow run 'uber-koala' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
23:18:39.347 | DEBUG   | prefect.task_runner.ray - Starting task runner...
23:18:39.347 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2023-11-03 23:18:40,978 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
23:18:41.885 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
23:18:41.886 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
23:18:41.889 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/msq/.prefect/prefect.db
23:18:41.923 | DEBUG   | Flow run 'uber-koala' - Executing flow 'main' for flow run 'uber-koala'...
23:18:41.924 | DEBUG   | Flow run 'uber-koala' - Beginning execution...
23:18:41.945 | INFO    | Flow run 'uber-koala' - Created task run '_-0' for task '_'
23:18:41.953 | DEBUG   | prefect.task_runner.ray - Shutting down task runner...
23:18:44.613 | ERROR   | Flow run 'uber-koala' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: remote() argument after ** must be a mapping, not FieldInfo
23:18:44.615 | DEBUG   | Flow run 'uber-koala' - Crash details:
Traceback (most recent call last):
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/task_runners.py", line 187, in start
    yield self
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 843, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1861, in wait_for_task_runs_and_report_crashes
    states = await gather(*(future._wait for future in task_run_futures))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 417, in gather
    keys.append(tg.start_soon(call))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 393, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 668, in task_done
    exc = _task.exception()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1944, in report_flow_run_crashes
    yield
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1402, in create_task_run_then_submit
    await submit_task_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1463, in submit_task_run
    future = await task_runner.submit(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 172, in submit
    ray_decorator = ray.remote(**remote_options)
TypeError: remote() argument after ** must be a mapping, not FieldInfo
23:18:44.672 | DEBUG   | prefect.engine - Reported crashed flow run 'uber-koala' successfully!
Traceback (most recent call last):
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/task_runners.py", line 187, in start
    yield self
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 843, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1861, in wait_for_task_runs_and_report_crashes
    states = await gather(*(future._wait for future in task_run_futures))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 417, in gather
    keys.append(tg.start_soon(call))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 393, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 668, in task_done
    exc = _task.exception()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "prefect_ray_pydantic_v2.py", line 13, in <module>
    main()
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/flows.py", line 1079, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 283, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/usr/lib/python3.8/contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1944, in report_flow_run_crashes
    yield
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1402, in create_task_run_then_submit
    await submit_task_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1463, in submit_task_run
    future = await task_runner.submit(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 172, in submit
    ray_decorator = ray.remote(**remote_options)
TypeError: remote() argument after ** must be a mapping, not FieldInfo
AkeelMedina22 commented 10 months ago

Is there any update on this?

j-tr commented 9 months ago

as ray is currently not yet supporting pydantic 2 (https://github.com/ray-project/ray/blob/dd270c86feaf3b342014f20c3f7559a6e4cb4272/python/setup.py#L256) I don't think this can be fixed in prefect-ray alone.

tekumara commented 9 months ago

see https://github.com/ray-project/ray/issues/38977#issuecomment-1841862512

We've merged the changes to make Ray compatible with Pydantic 2.5+. You can start using Pydantic 2.5+ with Ray 2.9, which should be out at the end of December.

These changes should also be in the Ray nightly, so feel free to try them out!