PrefectHQ / prefect-ray

Prefect integrations with Ray
https://prefecthq.github.io/prefect-ray/
Apache License 2.0
63 stars 5 forks source link

Update `RayTaskRunner` for new `TaskRunner` interface #35

Closed anticorrelator closed 2 years ago

anticorrelator commented 2 years ago

Summary

This updates the RayTaskRunner to conform to the updates in the TaskRunner interface allowing for faster task submission: https://github.com/PrefectHQ/prefect/pull/6527.

Relevant Issue(s)

Checklist

ahuang11 commented 2 years ago

Tests are failing due to:

E   TypeError: Could not serialize the argument {'b': PrefectFuture('task_b-8e3be95f-0')} for a task or actor prefect.engine.begin_task_run. Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.

python/ray/_raylet.pyx:427: TypeError

self = <ray.util.client.client_pickler.ClientPickler object at 0x7fb5798624c0>
obj = ((), {'parameters': {'b': PrefectFuture('task_b-8e3be95f-0')}, 'result_filesystem': LocalFileSystem(basepath='/home/ru...FECT_ORION_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED=True)), 'task': <prefect.tasks.Task object at 0x7fb57270fc10>, ...})

    def dump(self, obj):
        try:
>           return Pickler.dump(self, obj)
E           TypeError: cannot pickle '_queue.SimpleQueue' object

From:


    def test_failing_flow_run(self, task_runner):
        @task
        def task_a():
            raise RuntimeError("This task fails!")

        @task
        def task_b():
            raise ValueError("This task fails and passes data downstream!")

        @task
        def task_c(b):
            # This task attempts to use the upstream data and should fail too
            return b + "c"

        @flow(version="test", task_runner=task_runner)
        def test_flow():
            a = task_a.submit()
            b = task_b.submit()
            c = task_c.submit(b)
            d = task_c.submit(c)

            return a, b, c, d

>       state = test_flow._run()
zanieb commented 2 years ago

Prefect futures should be exchanged for Ray futures. The Dask changes should have an example.

Edit: Oops I see we don't have an existing implementation for that here. Does Ray automatically resolve futures like Dask?

ahuang11 commented 2 years ago

Test now is failing on:

task_runner = <prefect_ray.task_runners.RayTaskRunner object at 0x7fe21c589350>

    def test_failing_flow_run(self, task_runner):
        @task
        def task_a():
            raise RuntimeError("This task fails!")

        @task
        def task_b():
            raise ValueError("This task fails and passes data downstream!")

        @task
        def task_c(b):
            # This task attempts to use the upstream data and should fail too
            return b + "c"

        @flow(version="test", task_runner=task_runner)
        def test_flow():
            a = task_a.submit()
            b = task_b.submit()
            c = task_c.submit(b)
            d = task_c.submit(c)

            return a, b, c, d

        state = test_flow._run()

        assert state.is_failed()
        a, b, c, d = state.result(raise_on_failure=False)
        with pytest.raises(RuntimeError, match="This task fails!"):
            a.result()
        with pytest.raises(
            ValueError, match="This task fails and passes data downstream"
        ):
            b.result()

>       assert c.is_pending()
E       AssertionError
ahuang11 commented 2 years ago

The interrogate coverage is failing because RayTaskRunner._optimize_futures.visit_fn does not have a docstring.

anticorrelator commented 2 years ago

I'm not sure this is ready for merging yet, in order to make this work we're resolving the ray futures directly--I think this might defeat the purpose of having using Ray for concurrency

ahuang11 commented 2 years ago

I checked out this branch and tried the basic example on the docs. Seems to still keep its concurrency.

12:06:48.713 | INFO    | prefect.engine - Created flow run 'russet-caracara' for flow 'count-to'
12:06:48.714 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-09-08 12:06:50,427 INFO services.py:1456 -- View the Ray dashboard at http://127.0.0.1:8265
12:06:52.281 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
12:06:52.281 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
12:06:53.381 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-0' for task 'shout'
12:06:53.392 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-0' for execution.
12:06:53.494 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-9' for task 'shout'
12:06:53.504 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-9' for execution.
12:06:53.508 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-4' for task 'shout'
12:06:53.521 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-4' for execution.
12:06:53.525 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-5' for task 'shout'
12:06:53.535 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-5' for execution.
12:06:53.538 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-8' for task 'shout'
12:06:53.547 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-8' for execution.
12:06:53.551 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-7' for task 'shout'
12:06:53.558 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-7' for execution.
12:06:53.562 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-2' for task 'shout'
12:06:53.570 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-2' for execution.
12:06:53.574 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-6' for task 'shout'
12:06:53.583 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-6' for execution.
12:06:53.589 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-3' for task 'shout'
12:06:53.596 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-3' for execution.
12:06:53.601 | INFO    | Flow run 'russet-caracara' - Created task run 'shout-58a68b34-1' for task 'shout'
12:06:53.609 | INFO    | Flow run 'russet-caracara' - Submitted task run 'shout-58a68b34-1' for execution.
(begin_task_run pid=16486) #2
(begin_task_run pid=16488) #0
(begin_task_run pid=16489) #5
(begin_task_run pid=16491) #7
(begin_task_run pid=16485) #6
(begin_task_run pid=16487) #9
(begin_task_run pid=16492) #8
(begin_task_run pid=16490) #4
(begin_task_run pid=16489) #1
(begin_task_run pid=16488) #3
12:06:59.430 | INFO    | Flow run 'russet-caracara' - Finished in state Completed('All states completed.')
anticorrelator commented 2 years ago

I believe it will retain concurrency for independent tasks, but if tasks are dependent on eachother they will pass around resolved results instead of futures

desertaxle commented 2 years ago

Here's the issue for tracking: https://github.com/PrefectHQ/prefect-ray/issues/37