PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.64k stars 1.65k forks source link

Global concurrency not being applied to tasks when using DaskTaskRunner #11014

Open jfrancis5 opened 1 year ago

jfrancis5 commented 1 year ago

First check

Bug summary

The Global Concurrency setting is not being applied to tasks in my flow using the DaskTaskRunner. Here is my configuration for the the global concurrency. I set a global concurrency limit of 2, but this limit is being ignored when running my tasks, and more than 2 tasks are running at the same time

Screenshot 2023-10-23 at 1 26 59 PM

Error screenshot: Screenshot 2023-10-23 at 1 22 41 PM

Reproduction

@flow(task_runner=DaskTaskRunner())
def upload_to_db():

   tables_to_run=['table1','table2','table3']

   for table in tables_to_run:
      run_table.with_options(name=f"{table}").submit(table)

@task(tags=["ocr"])
def run_table(table):
     print(f"before conn {table}")

Error

The tasks all continue to start in parallel and ignore the limit set in the global concurrency. In this case I set a concurrency limit of 2, but that is being ignored and all three tasks are starting together as shown in the bug summary above

Versions

Version:             2.13.6
API version:         0.8.4
Python version:      3.10.0
Git commit:          8996e4a4
Built:               Thu, Oct 12, 2023 3:54 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

Additional context

No response

serinamarie commented 1 year ago

Hi @jfrancis5, thanks for creating your first issue in Prefect. I'm curious if you are able to replicate the issue using the concurrency context manager?

jfrancis5 commented 1 year ago

Hi @serinamarie I tried using the concurrency context manager as you suggested but I got the following error saying that the module was not found. Do you know which module I would need to install?

image
serinamarie commented 1 year ago

Hm, I wasn't able to replicate your issue just using the example from the docs:

from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y

@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)

if __name__ == "__main__":
    my_flow()

Can you share full traceback and an MRE?

zzstoatzz commented 8 months ago

hi @jfrancis5 - just bumping this. Is this still an issue for you? if so, could you provide an MRE where you're using the concurrency context manager and a version of prefect new enough to have that module?