PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.38k stars 1.64k forks source link

LocalDaskExecutor(scheduler='threads') not running concurrently #2127

Closed Marlin-Na closed 4 years ago

Marlin-Na commented 4 years ago

Description

I suppose LocalDaskExecutor(scheduler='threads') should be running concurrently. However this does not seem true according to my example below.

Not sure why it is the case, probably the task starts when compute is called in the wait? https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/executors/dask.py#L287

Reproduction

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    task_sleep(seconds=10)
    task_sleep(seconds=12)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads'))

which produces

[2020-03-07 05:34:35,250] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-03-07 05:34:35,253] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-07 05:34:35,301] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
end sleeping 10
[2020-03-07 05:34:45,316] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 05:34:45,325] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 12
[2020-03-07 05:34:57,342] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 05:34:57,343] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Environment

prefect diagnostics
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.3.0-1011-gcp-x86_64-with-Ubuntu-19.10-eoan",
    "prefect_version": "0.9.7",
    "python_version": "3.7.5"
  }
}
jlowin commented 4 years ago

This is a great callout, thank you. I believe what's happening is that we implicitly wait on each terminal task in sequence, evaluating all of its upstream tasks concurrently. However, a task only gets evaluated when a terminal task downstream of it is computed. Therefore, in your flow, the two terminal tasks are evaluated in sequence.

Below, I can get concurrency as expected by adding a single (dummy) terminal task that comes after both tasks.

We should think of ways to ensure the entire graph is computed at once (possibly automatically adding dummy nodes like the one I'm adding here at runtime) - cc @cicdw


import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    t1 = task_sleep(seconds=10)
    t2 = task_sleep(seconds=12)

    t3 = prefect.Task()
    t3.set_upstream(t1)
    t3.set_upstream(t2)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads'))

Logs:


[2020-03-07 19:46:15,142] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-03-07 19:46:15,144] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-07 19:46:15,235] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
[2020-03-07 19:46:15,240] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 10
[2020-03-07 19:46:25,254] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
end sleeping 12
[2020-03-07 19:46:27,248] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 19:46:27,254] INFO - prefect.TaskRunner | Task 'Task': Starting task run...
[2020-03-07 19:46:27,257] INFO - prefect.TaskRunner | Task 'Task': finished task run for task with final state: 'Success'
[2020-03-07 19:46:27,258] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Marlin-Na commented 4 years ago

@jlowin Thanks for your insights and tips. I have been trying to implement a custom executor and thought it might be helpful to share some experience I got (e.g. for improving the doc or for who wants to implement one). I did the following as a simple asynchronous executor.

from prefect import task, Flow
from prefect.engine.executors.base import Executor
from prefect.engine.state import State
from contextlib import contextmanager

class ThreadExecutor(Executor):
    "An asynchronous executor for Prefect"
    @contextmanager
    def start(self):
        import concurrent
        try:
            with concurrent.futures.ThreadPoolExecutor() as pool_executor:
                self.pool_executor = pool_executor
                yield
        finally:
            self.pool_executor = None
    def submit(self, fn, *args, **kwargs):
        ## The upstream_states can be either future or state.
        ## wait and convert results here??
        kwargs['upstream_states'] = { edge:(value if isinstance(value, State) else value.result())
                                            for edge, value in kwargs['upstream_states'].items() }
        future = self.pool_executor.submit(fn, *args, **kwargs)
        return future
    def map(self, fn, *args):
        raise NotImplementedError
    def wait(self, futures):
        return {task: future.result() for task, future in futures.items()}
  1. The arguments of wait is a dictionary of task to future instead of a list of futures which I originally assumed.

  2. Note in submit, I have modify the kwargs['upstream_states'] and do something like value if isinstance(value, State) else value.result() for it to work. I am not sure I am on the right way. It sounds to me that the upstream 'future' should be waited and resolved to State prior calling submit. Is it expected for submit to modify kwargs['upstream_states']?

Thanks.

jcrist commented 4 years ago

Trying your example from above, I'm unable to reproduce on master (0.10.0+297.g1babcb7f currently). I get the expected concurrency as is:

$ python test.py
[2020-04-15 20:38:50,189] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-04-15 20:38:50,193] INFO - prefect.FlowRunner | Starting flow run.
[2020-04-15 20:38:50,296] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
[2020-04-15 20:38:50,303] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 10
[2020-04-15 20:39:00,323] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
end sleeping 12
[2020-04-15 20:39:02,329] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-04-15 20:39:02,329] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

@Marlin-Na, could you try rerunning on master to see if the issue still persists for you?

Marlin-Na commented 4 years ago

Hi @jcrist , the issue still persists from my side. I tested three versions: 0.10.2+47.g13977484 (current master), 0.10.2+21.g1babcb7f (your version) and 0.10.2 (pypi release). I also attached here versions of other python packages:

certifi==2020.4.5.1
chardet==3.0.4
click==7.1.1
cloudpickle==1.3.0
croniter==0.3.31
dask==2.14.0
distributed==2.14.0
docker==4.2.0
fsspec==0.7.2
HeapDict==1.0.1
idna==2.9
locket==0.2.0
marshmallow==3.5.1
marshmallow-oneofschema==2.0.1
msgpack==1.0.0
mypy-extensions==0.4.3
partd==1.1.0
pendulum==2.1.0
pkg-resources==0.0.0
prefect==0.10.2+21.g1babcb7f
psutil==5.7.0
python-box==4.2.2
python-dateutil==2.8.1
python-slugify==4.0.0
pytz==2019.3
pytzdata==2019.3
PyYAML==5.3.1
requests==2.23.0
ruamel.yaml==0.16.10
ruamel.yaml.clib==0.2.0
six==1.14.0
sortedcontainers==2.1.0
supervisor==4.1.0
tabulate==0.8.7
tblib==1.6.0
text-unidecode==1.3
toml==0.10.0
toolz==0.10.0
tornado==6.0.4
typing==3.7.4.1
typing-extensions==3.7.4.2
urllib3==1.25.9
websocket-client==0.57.0
zict==2.0.0
jcrist commented 4 years ago

Hmmm. Can you run the following?

$ python -c "import dask.system;print(dask.system.cpu_count())"
Marlin-Na commented 4 years ago

@jcrist ah, that's 1 (I am running on a Google VM). So that should explain the difference. However I don't think concurrency should depend on number of cpus, e.g. for IO-bounded tasks or for tasks calling external tools.

jcrist commented 4 years ago

However I don't think concurrency should depend on number of cpus, e.g. for IO-bounded tasks or for tasks calling external tools.

When starting a local dask cluster, by default it determines the worker configuration based on the cpus available. Since you're using the local scheduler (threads only), you can configure this by passing num_workers=... to LocalDaskExecutor.

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    task_sleep(seconds=10)
    task_sleep(seconds=12)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads', num_workers=8))

If you were using the distributed scheduler (which also runs fine locally) these keywords would be n_workers (number of worker processes) and threads_per_worker (number of threads per worker).

Marlin-Na commented 4 years ago

@jcrist Thanks! This looks great.

jcrist commented 4 years ago

Glad to help! Ok to close?