PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.56k stars 1.52k forks source link

Trouble using `get_dask_client` with a Coiled cluster #12971

Open scharlottej13 opened 1 year ago

scharlottej13 commented 1 year ago

Hi! I noticed recently that get_dask_client (added in https://github.com/PrefectHQ/prefect-dask/pull/33) is not working with a Coiled cluster. I'm using Prefect Cloud.

Problem

The following snippet does not work using a Coiled cluster (but does work with dask.distributed.LocalCluster):

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "get-dask-client",
    },
)

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df

@flow(task_runner=coiled_runner)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

if __name__ == "__main__":
    dask_flow()

When I run the above snippet I get a TypeError: TLS expects assl_contextargument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None.

Full traceback ```python-traceback Traceback (most recent call last): File "/Users/sarahj/Downloads/prefect-dask-client.py", line 31, in dask_flow() File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__ return enter_flow_run_engine_from_flow_call( File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 175, in enter_flow_run_engine_from_flow_call return anyio.run(begin_run) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client return await fn(*args, **kwargs) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 256, in create_then_begin_flow_run return await state.result(fetch=True) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/engine.py", line 665, in orchestrate_flow_run result = await run_sync(flow_call) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread tg.start_soon( File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__ raise exceptions[0] File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result result = __fn(*args, **kwargs) File "/Users/sarahj/Downloads/prefect-dask-client.py", line 27, in dask_flow return prefect_future.result() File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/futures.py", line 226, in result return sync( File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 267, in sync return run_async_from_worker_thread(__async_fn, *args, **kwargs) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread return anyio.from_thread.run(call) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run return asynclib.run_async_from_thread(func, *args) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread return f.result() File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/concurrent/futures/_base.py", line 446, in result return self.__get_result() File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result raise self._exception File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File "/Users/sarahj/mambaforge/envs/prefect2-coiled2/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/opt/coiled/env/lib/python3.9/site-packages/prefect/engine.py", line 1533, in orchestrate_task_run File "/opt/coiled/env/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread File "/opt/coiled/env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync File "/opt/coiled/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread File "/opt/coiled/env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run File "/Users/sarahj/Downloads/prefect-dask-client.py", line 18, in compute_task with get_dask_client() as client: # noqa File "/opt/coiled/env/lib/python3.9/contextlib.py", line 119, in __enter__ return next(self.gen) File "/opt/coiled/env/lib/python3.9/site-packages/prefect_dask/utils.py", line 103, in get_dask_client File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 988, in __init__ File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1185, in start File "/opt/coiled/env/lib/python3.9/site-packages/distributed/utils.py", line 405, in sync File "/opt/coiled/env/lib/python3.9/site-packages/distributed/utils.py", line 378, in f File "/opt/coiled/env/lib/python3.9/site-packages/tornado/gen.py", line 769, in run File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1265, in _start File "/opt/coiled/env/lib/python3.9/site-packages/distributed/client.py", line 1328, in _ensure_connected File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/core.py", line 291, in connect File "/opt/coiled/env/lib/python3.9/asyncio/tasks.py", line 479, in wait_for File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 488, in connect File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 543, in _get_connect_args File "/opt/coiled/env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 401, in _expect_tls_context TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None ```

Looking at the full traceback, it seems _expect_tls_context in https://github.com/dask/distributed/blob/main/distributed/comm/tcp.py expects TLS arguments, but is not getting the information needed.

Since get_dask_client is a utility around worker_client, I checked to see if using worker_client works with a Coiled cluster, and it does:

import dask
from dask.distributed import worker_client
from prefect import flow, task
from prefect_dask import DaskTaskRunner

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "prefect-worker-client",
    },
)

@task
def calling_compute_in_a_task():
    with worker_client(separate_thread=False) as client:
        df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
        summary_df = df.describe()
        client.compute(summary_df)

@flow(task_runner=coiled_runner)
def test_flow():
    calling_compute_in_a_task.submit()

if __name__ == "__main__":
    test_flow()

Potential solution?

If I get the client the way dask.distributed.worker_client does, then the aforementioned failing snippet works. ie changing (lines 103-104 in get_dask_client) from:

with Client(**client_kwargs) as client:
    yield client

to:

client = get_client(timeout=timeout)
yield client

It seems like there are other arguments get_dask_client may need, so not saying this exactly will work, but thought it'd be a helpful start.

ghislainp commented 1 year ago

the get_dask_client function seems not to work when using an external dask cluster declared with DaskTaskRunner(address=data_cluster_address). I get an error "No global client found and no address provided". Below is one of the examples in README.md slightly adapted to run on an external dask cluster.

I investigated a bit and neither get_worker nor get_client work. It seems that the task 'process_data' is unable to know that it's running in a dask worker. I suspect this is related to other problems with external dask cluster, such as PrefectHQ/prefect-dask#47

I hacked prefect_dask.utils._generate_client_kwargs to use client_kwargs['address'] instead of calling get_client().scheduler.address, and it works now. I can make a PR. However, this is still not satisfactorily, because I need to secede and rejoin my task as explained here https://distributed.dask.org/en/stable/task-launch.html because I deadlock my cluster very easily.

I think that scheduling task with prefect_dask on a cluster, and running calculations with the same cluster is a useful pattern to deal with memory problems, and to monitor what happens on the cluster with the dask dashboard. Though, I'm open to other options, because I've been stuck on this for months...

import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

# client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
    with get_dask_client():
        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address="tcp://127.0.0.1:8786"))
def dask_pipeline():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

# dask_pipeline()
fjetter commented 1 year ago

I briefly peeked into the implementation and noticed that the get_dask_client method is initializing a new client every time.

Dask offers an API that tries to use already existing clients and ensure the task cannot deadlock, see https://distributed.dask.org/en/stable/api.html#distributed.worker_client

and https://distributed.dask.org/en/stable/task-launch.html

It's strongly recommend to use the worker_client contextmanager for this application. I briefly skimmed https://github.com/PrefectHQ/prefect-dask/pull/33 but couldn't find an explanation why this was not used

zanieb commented 1 year ago

Hey @fjetter thanks for taking a look!

I think we weren't using the worker_client context manager because it did not support retrieval of an asynchronous client which we need for asynchronous tasks.

dchudz commented 1 year ago

FWIW I think there's an issue here for async worker_client: https://github.com/dask/distributed/issues/5513.

jrbourbeau commented 1 year ago

I tried running this example again using worker_client and the main branch of distributed (which contains https://github.com/dask/distributed/pull/7844) and things seemed to work as expected

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
from distributed import worker_client

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "get-dask-client",
    },
)

@task
def compute_task():
    # with get_dask_client() as client:
    with worker_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df

@flow(task_runner=coiled_runner)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

if __name__ == "__main__":
    dask_flow()

@madkinsz do you see something similar? Is there a reason to still prefer get_dask_client over worker_client?

zanieb commented 1 year ago

@jrbourbeau I think we're just providing get_dask_client for symmetry with get_async_dask_client which we need to expose for users writing asynchronous tasks e.g.

from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client
import distributed

@task
async def atest_task():

    async with get_async_dask_client() as client:
        await client.submit(lambda x: x, 1)

    with distributed.worker_client() as client:
        await client.submit(lambda x: x, 1)
        # AttributeError: 'int' object has no attribute '__await__'

@flow(task_runner=DaskTaskRunner())
def test_flow():
    atest_task.submit()

if __name__ == "__main__":
    test_flow()
giorgiobasile commented 1 year ago

Hi, I just wanted to mention that I am hitting a similar problem while trying to instantiate a DaskTaskRunner connecting to a remote cluster preemptively instantiated with the Dask Gateway instance of the Microsoft Planetary Computer. Unfortunately, my code doesn't add much to what has been shared so far, except that I hit the ssl_context-related exception as soon as the flow is started, instead of waiting for any task submission or get_dask_client() call.

Here is my code:


import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner(address="gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.530b7bd7e4164670bac990661b5fbdc4"))
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

if __name__ == "__main__":
    dask_flow()

Output:


16:55:35.269 | INFO    | prefect.engine - Created flow run 'papaya-mouflon' for flow 'dask-flow'
16:55:35.270 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at gateway://pccompute-dask.westeurope.cloudapp.azure.com:80/prod.530b7bd7e4164670bac990661b5fbdc4
16:55:35.271 | ERROR   | Flow run 'papaya-mouflon' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/test_dask_runner.py", line 43, in <module>
    dask_flow()
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/flows.py", line 540, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 272, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 364, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/engine.py", line 509, in begin_flow_run
    flow_run_context.task_runner = await stack.enter_async_context(
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 638, in enter_async_context
    result = await _enter(cm)
             ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 204, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect/task_runners.py", line 185, in start
    await self._start(exit_stack)
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 331, in _start
    self._client = await exit_stack.enter_async_context(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.4/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 638, in enter_async_context
    result = await _enter(cm)
             ^^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1457, in __aenter__
    await self
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1268, in _start
    await self._ensure_connected(timeout=timeout)
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/client.py", line 1331, in _ensure_connected
    comm = await connect(
           ^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/comm/core.py", line 292, in connect
    comm = await wait_for(
           ^^^^^^^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/distributed/utils.py", line 1807, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/giorgio/Projects/personal/prefect-planetary-computer/.venv/lib/python3.11/site-packages/dask_gateway/comm.py", line 39, in connect
    raise TypeError(
TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None

EDIT: In my case the error was due to SSL info not being passed to the task runner, as explained here. Setting the security parameter solves the issue.