coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Prefect integration page is out of date #201

Closed tonycombocurve closed 2 years ago

tonycombocurve commented 2 years ago

Your page on using Prefect for Workflow automation is out of date:

https://docs.coiled.io/user_guide/examples/prefect.html

The code examples are based on Prefect 1.0, which is no longer the current release. In particular, they have changed the API for invoking flows. In particular, there is no longer a run() method on the flow class. Instead, the flow is callable so you invoke it like this: flow(). They are also encouraging the use of the @flow on functions that implement the flow logic. Please consider updating your docs accordingly.

Thanks and keep up the great work!

tonycombocurve commented 2 years ago

See the Prefect migration guide for further details:

https://docs.prefect.io/migration-guide/?h=executor#conceptual-and-syntax-changes

jrbourbeau commented 2 years ago

Thanks for reporting @tonycombocurve. I believe updating this example is on @scharlottej13's radar

phobson commented 2 years ago

Tony, Sarah, and I chatted a bit in Slack. One confounding factor with even prefect's documentation is that it's example of connecting to a cluster with a TCP address and not a TSL address, for which distributed requires additional info (via ssl.SSLContext) to connect to. I opened an issue over at prefect_dask here: https://github.com/PrefectHQ/prefect-dask/issues/23

We also tried connecting the DaskTaskRunner to the coiled cluster by name, but here was no luck there:

import time
from prefect import task, flow
from coiled import Cluster

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

cluster = Cluster(name="quick-test", package_sync=True)
runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": cluster.name}
)

@task(name="shouter")
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=runner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)
Log + Traceback 10:22:20.374 | INFO | prefect.engine - Created flow run 'small-mantis' for flow 'count-to' 10:22:20.375 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `coiled._beta.cluster.ClusterBeta` 10:22:26.642 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://54.209.185.4:8787 10:22:26.741 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-0' for task 'shouter' 10:22:27.021 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-0' for execution. 10:22:27.037 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-1' for task 'shouter' 10:22:27.041 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-1' for execution. 10:22:27.055 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-2' for task 'shouter' 10:22:27.059 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-2' for execution. 10:22:27.082 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-3' for task 'shouter' 10:22:27.087 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-3' for execution. 10:22:27.102 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-4' for task 'shouter' 10:22:27.106 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-4' for execution. 10:22:27.121 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-5' for task 'shouter' 10:22:27.125 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-5' for execution. 10:22:27.143 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-6' for task 'shouter' 10:22:27.149 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-6' for execution. 10:22:27.167 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-7' for task 'shouter' 10:22:27.171 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-7' for execution. 10:22:27.187 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-8' for task 'shouter' 10:22:27.191 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-8' for execution. 10:22:27.205 | INFO | Flow run 'small-mantis' - Created task run 'shouter-58a68b34-9' for task 'shouter' 10:22:27.215 | INFO | Flow run 'small-mantis' - Submitted task run 'shouter-58a68b34-9' for execution. 10:22:37.757 | INFO | Task run 'shouter-58a68b34-0' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.777 | INFO | Task run 'shouter-58a68b34-1' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.791 | INFO | Task run 'shouter-58a68b34-2' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.804 | INFO | Task run 'shouter-58a68b34-3' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.816 | INFO | Task run 'shouter-58a68b34-4' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.833 | INFO | Task run 'shouter-58a68b34-5' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.847 | INFO | Task run 'shouter-58a68b34-6' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.862 | INFO | Task run 'shouter-58a68b34-7' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.878 | INFO | Task run 'shouter-58a68b34-8' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:37.890 | INFO | Task run 'shouter-58a68b34-9' - Crash detected! Execution was interrupted by an unexpected exception. 10:22:38.885 | ERROR | Flow run 'small-mantis' - Finished in state Failed('10/10 states failed.') Traceback (most recent call last): File "/Users/paul/work/office-hours/support-metrostar/test_scratch.py", line 29, in count_to(10) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__ return enter_flow_run_engine_from_flow_call( File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call return anyio.run(begin_run) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client return await fn(*args, **kwargs) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run return state.result() File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result state.result() File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result raise data File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/prefect_dask/task_runners.py", line 236, in wait return await future.result(timeout=timeout) File "/Users/paul/mambaforge/envs/hydro/lib/python3.9/site-packages/distributed/client.py", line 292, in _result raise exc.with_traceback(tb) File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 947, in begin_task_run File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 1029, in orchestrate_task_run File "/opt/conda/lib/python3.9/site-packages/prefect/client.py", line 1767, in propose_state File "/opt/conda/lib/python3.9/site-packages/prefect/client.py", line 1850, in set_task_run_state File "/opt/conda/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post File "/opt/conda/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request File "/opt/conda/lib/python3.9/site-packages/prefect/client.py", line 280, in send File "/opt/conda/lib/python3.9/site-packages/prefect/client.py", line 226, in raise_for_status Exception: PrefectHTTPStatusError("Client error '404 Not Found' for url 'http://ephemeral-orion/api/task_runs/d88a7ed4-0a12-495e-a243-017465b02dc2/set_state'\nResponse: {'exception_message': 'Task run with id d88a7ed4-0a12-495e-a243-017465b02dc2 not found'}\nFor more information check: https://httpstatuses.com/404")
ntabris commented 2 years ago
cluster = Cluster(...)
runner = DaskTaskRunner(
  address=cluster.scheduler_address,
  client_kwargs={'security': cluster.security}
)

Ideally DaskTaskRunner would have API more like Client, as I suggested in https://github.com/PrefectHQ/prefect-dask/issues/23#issuecomment-1241207527

scharlottej13 commented 2 years ago

@tonycombocurve wanted to let you know we now have a Prefect 2 Coiled docs page! As discussed in our call the other week, I think the section on using Prefect and Coiled with Dask collections is probably what you're looking for.

You can navigate to the Prefect 2 page from our Prefect 1 example. As more folks start transitioning to Prefect 2, we'll point people directly to Prefect 2 instead.

Thanks!