PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.95k stars 1.57k forks source link

Dask jobs not executing on Prefect 3.0.1 #15331

Closed ToxicRoot closed 2 weeks ago

ToxicRoot commented 2 weeks ago

Bug summary

unable to submit a Dask job using Prefect 3.0.1. On async task, I get

tf = await test.submit()
         ^^^^^^^^^^^^^^^^^^^
TypeError: object PrefectDaskFuture can't be used in 'await' expression

on sync task the launch of Dask hangs with

AssertionError: daemonic processes are not allowed to have children

and does not proceed to execute task code

How to verify:

import asyncio
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task()
async def test():
    return 'test return'

@flow(name="dask_test", task_runner=DaskTaskRunner())
async def main():
    tf = await test.submit()
    result = await tf.result()
    print(result)

if __name__ == '__main__':
    asyncio.run(main())

Version info (prefect version output)

Version:             3.0.1
API version:         0.8.4
Python version:      3.12.5
Git commit:          c6b2ffe1
Built:               Fri, Sep 6, 2024 10:05 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server
Pydantic version:    2.9.1
Integrations:
  prefect-docker:    0.6.1
  prefect-sqlalchemy: 0.5.0
  prefect-gitlab:    0.3.0
  prefect-dask:      0.3.0

Additional context

No response

zzstoatzz commented 2 weeks ago

hello @ToxicRoot - in prefect 3, you should not await the submit or result methods on tasks (regardless of whether the task is sync/async or choice of task runner). You need to remove the await keyword because these are now sync methods

you can find the docs on this here.

ToxicRoot commented 2 weeks ago

Yea, I've tried without await as well and I get the same effect as with full sync tasks: flow process gets hanged indefinitely on
AssertionError: daemonic processes are not allowed to have children

full trace:

17:02:16.593 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:02:16.608 | INFO    | distributed.http.proxy - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
17:02:16.614 | INFO    | distributed.scheduler - State start
17:02:16.617 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/worker-krwendg3', purging
17:02:16.618 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/worker-_d24h3uf', purging
17:02:16.619 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/worker-wac_u29g', purging
17:02:16.620 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-o9x0jbnt', purging
17:02:16.621 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-dvbmire5', purging
17:02:16.622 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/worker-jz4q5sld', purging
17:02:16.623 | INFO    | distributed.diskutils - Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-j4m003xb', purging
17:02:16.629 | INFO    | distributed.scheduler -   Scheduler at:     tcp://127.0.0.1:43957
17:02:16.630 | INFO    | distributed.scheduler -   dashboard at:  http://127.0.0.1:8787/status
17:02:16.630 | INFO    | distributed.scheduler - Registering Worker plugin shuffle
17:02:16.659 | INFO    | distributed.nanny -         Start Nanny at: 'tcp://127.0.0.1:43107'
17:02:16.665 | INFO    | distributed.nanny -         Start Nanny at: 'tcp://127.0.0.1:44149'
17:02:16.669 | INFO    | distributed.nanny -         Start Nanny at: 'tcp://127.0.0.1:36541'
17:02:16.674 | INFO    | distributed.nanny -         Start Nanny at: 'tcp://127.0.0.1:38175'
17:02:20.217 | INFO    | distributed.worker -       Start worker at:      tcp://127.0.0.1:46095
17:02:20.220 | INFO    | distributed.worker -          Listening to:      tcp://127.0.0.1:46095
17:02:20.220 | INFO    | distributed.worker -           Worker name:                          2
17:02:20.221 | INFO    | distributed.worker -          dashboard at:            127.0.0.1:43055
17:02:20.221 | INFO    | distributed.worker - Waiting to connect to:      tcp://127.0.0.1:43957
17:02:20.222 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.223 | INFO    | distributed.worker -               Threads:                          4
17:02:20.223 | INFO    | distributed.worker -                Memory:                   7.85 GiB
17:02:20.224 | INFO    | distributed.worker -       Local Directory: /tmp/dask-scratch-space/worker-ojwt6lpj
17:02:20.224 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.231 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:46095', name: 2, status: init, memory: 0, processing: 0>
17:02:20.235 | INFO    | distributed.worker -       Start worker at:      tcp://127.0.0.1:45999
17:02:20.238 | INFO    | distributed.worker -          Listening to:      tcp://127.0.0.1:45999
17:02:20.239 | INFO    | distributed.worker -           Worker name:                          1
17:02:20.239 | INFO    | distributed.worker -          dashboard at:            127.0.0.1:39977
17:02:20.240 | INFO    | distributed.worker - Waiting to connect to:      tcp://127.0.0.1:43957
17:02:20.240 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.241 | INFO    | distributed.worker -               Threads:                          4
17:02:20.241 | INFO    | distributed.worker -                Memory:                   7.85 GiB
17:02:20.242 | INFO    | distributed.worker -       Local Directory: /tmp/dask-scratch-space/worker-33re7q3h
17:02:20.243 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.266 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:46095
17:02:20.267 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:46434
17:02:20.267 | INFO    | distributed.worker - Starting Worker plugin shuffle
17:02:20.268 | INFO    | distributed.worker -         Registered to:      tcp://127.0.0.1:43957
17:02:20.269 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.271 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:43957
17:02:20.275 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:45999', name: 1, status: init, memory: 0, processing: 0>
17:02:20.276 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:45999
17:02:20.276 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:46442
17:02:20.276 | INFO    | distributed.worker - Starting Worker plugin shuffle
17:02:20.278 | INFO    | distributed.worker -         Registered to:      tcp://127.0.0.1:43957
17:02:20.279 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.280 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:43957
17:02:20.324 | INFO    | distributed.worker -       Start worker at:      tcp://127.0.0.1:39829
17:02:20.327 | INFO    | distributed.worker -          Listening to:      tcp://127.0.0.1:39829
17:02:20.328 | INFO    | distributed.worker -           Worker name:                          3
17:02:20.328 | INFO    | distributed.worker -          dashboard at:            127.0.0.1:34879
17:02:20.329 | INFO    | distributed.worker - Waiting to connect to:      tcp://127.0.0.1:43957
17:02:20.329 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.330 | INFO    | distributed.worker -               Threads:                          4
17:02:20.330 | INFO    | distributed.worker -                Memory:                   7.85 GiB
17:02:20.331 | INFO    | distributed.worker -       Local Directory: /tmp/dask-scratch-space/worker-8wc_eq2l
17:02:20.332 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.339 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:39829', name: 3, status: init, memory: 0, processing: 0>
17:02:20.340 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:39829
17:02:20.341 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:46444
17:02:20.341 | INFO    | distributed.worker - Starting Worker plugin shuffle
17:02:20.342 | INFO    | distributed.worker -         Registered to:      tcp://127.0.0.1:43957
17:02:20.343 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.344 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:43957
17:02:20.543 | INFO    | distributed.worker -       Start worker at:      tcp://127.0.0.1:40327
17:02:20.546 | INFO    | distributed.worker -          Listening to:      tcp://127.0.0.1:40327
17:02:20.547 | INFO    | distributed.worker -           Worker name:                          0
17:02:20.547 | INFO    | distributed.worker -          dashboard at:            127.0.0.1:36177
17:02:20.548 | INFO    | distributed.worker - Waiting to connect to:      tcp://127.0.0.1:43957
17:02:20.548 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.549 | INFO    | distributed.worker -               Threads:                          4
17:02:20.549 | INFO    | distributed.worker -                Memory:                   7.85 GiB
17:02:20.550 | INFO    | distributed.worker -       Local Directory: /tmp/dask-scratch-space/worker-r659be2g
17:02:20.550 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.558 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:40327', name: 0, status: init, memory: 0, processing: 0>
17:02:20.559 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:40327
17:02:20.560 | INFO    | distributed.worker - Starting Worker plugin shuffle
17:02:20.560 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:46456
17:02:20.561 | INFO    | distributed.worker -         Registered to:      tcp://127.0.0.1:43957
17:02:20.561 | INFO    | distributed.worker - -------------------------------------------------
17:02:20.563 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:43957
17:02:20.588 | INFO    | distributed.scheduler - Receive client connection: PrefectDaskClient-7640f999-7046-11ef-82bf-20cf306b6fa2
17:02:20.589 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:46466
17:02:20.592 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
17:02:20.680 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:02:20.696 | INFO    | distributed.http.proxy - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
/data/orion/venv/lib/python3.12/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 39475 instead
  warnings.warn(
17:02:20.703 | INFO    | distributed.scheduler - State start
17:02:20.710 | INFO    | distributed.scheduler -   Scheduler at:     tcp://127.0.0.1:42505
17:02:20.711 | INFO    | distributed.scheduler -   dashboard at:  http://127.0.0.1:39475/status
17:02:20.712 | INFO    | distributed.scheduler - Registering Worker plugin shuffle
17:02:20.781 | ERROR   | distributed.nanny - Failed to start process
Traceback (most recent call last):
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 748, in start
    await self.process.start()
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: daemonic processes are not allowed to have children
17:02:20.785 | ERROR   | distributed.nanny - Failed to start process
Traceback (most recent call last):
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 748, in start
    await self.process.start()
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: daemonic processes are not allowed to have children
17:02:20.787 | ERROR   | distributed.nanny - Failed to start process
Traceback (most recent call last):
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 748, in start
    await self.process.start()
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: daemonic processes are not allowed to have children
17:02:20.789 | ERROR   | distributed.nanny - Failed to start process
Traceback (most recent call last):
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/nanny.py", line 748, in start
    await self.process.start()
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/data/orion/venv/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: daemonic processes are not allowed to have children
zzstoatzz commented 2 weeks ago

thanks! I can reproduce this - we'll take a look at this.

possibly related to #15260

desertaxle commented 2 weeks ago

Hey! This was fixed in https://github.com/PrefectHQ/prefect/pull/15341 and released in version 0.3.1 of prefect-dask. If you see any more issues like this, please open a new issue!