from typing import List
from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
from prefect.settings import temporary_settings, PREFECT_LOCAL_STORAGE_PATH
# from prefect_dask.task_runners import DaskTaskRunner
@task
def say_hello(name: str) -> None:
logger = get_run_logger()
logger.info(f"hello {name}")
@task
def say_goodbye(name: str) -> None:
# logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
logger = get_run_logger()
print("print goodbye")
logger.info(f"goodbye {name}")
# run on an existing ray cluster
@flow(
task_runner=RayTaskRunner(
# 127.0.0.1:10001 is port-forwarded to the remote ray cluster
address="ray://myaddress",
)
)
def greetings(names: List[str]) -> None:
for name in names:
say_hello.submit(name)
if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
Results in:
15:29:39.838 | INFO | prefect.engine - Created flow run 'wakeful-condor' for flow 'greetings'
15:29:39.839 | INFO | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://3.95.199.184:10001
15:29:43.498 | INFO | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
15:29:44.509 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
15:29:46.272 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-0' for execution.
15:29:46.366 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
15:29:46.458 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-2' for execution.
15:29:46.461 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
15:29:46.553 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-3' for execution.
15:29:46.554 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
15:29:46.647 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-1' for execution.
15:29:46.728 | INFO | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:46.849 | INFO | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.005 | INFO | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.134 | INFO | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:48.747 | ERROR | Flow run 'wakeful-condor' - Finished in state Failed('4/4 states failed.')
Traceback (most recent call last):
File "/Users/andrew/test2.py", line 40, in <module>
greetings(["arthur", "trillian", "ford", "marvin"])
File "/Users/andrew/Applications/python/prefect/src/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/andrew/Applications/python/prefect/src/prefect/client/orion.py", line 86, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 232, in create_then_begin_flow_run
return state.result()
File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 116, in result
state.result()
File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 102, in result
raise data
TypeError: missing a required argument: 'result_factory'
I checked out https://github.com/PrefectHQ/prefect/pull/6908 branch on both local and remote machine, then ran this code (if address is omitted, it works). Any ideas @madkinsz?
Results in: