PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.81k stars 1.61k forks source link

Running the same subflow concurrently multiple times raises `RuntimeError( #5853

Closed marvin-robot closed 2 years ago

marvin-robot commented 2 years ago

Opened from the Prefect Public Slack Community

marcin.grzybowski: Hello again. How can I run the same Flow parallelly? I have used code from https://discourse.prefect.io/t/how-can-i-run-multiple-subflows-or-child-flows-in-parallel/96 and it works.

But when I simplify it and modify to run same Flow (not different 4) then I get

RuntimeError("The task runner is already started!")

Modified code below:

import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_1()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

anna: I can reproduce the issue and I can confirm that it's either a bug or that the tutorial must be updated if there is another way of doing that.

But I'd like to know why would you want to do it this way? Do you really want to run the same subflow twice in parallel with the same parameters? I'm curious what is your use case for that?

As a temporary solution, turning that into a task instead of a subflow will work

marcin.grzybowski: Nah, i would like to run the same flow with different parameters

anna: can you explain your use case a bit more? are those subflows containing a lot of tasks?

if you would build those as tasks, then the issue becomes much simpler, but I can understand why subflows may sometimes be better

marcin.grzybowski: Yep, I'm just checking if we can achieve our goals - and as mentioned in other thread: https://prefect-community.slack.com/archives/C03D12VV4NN/p1654007524220189?thread_ts=1654004474.016759&cid=C03D12VV4NN we will probably need to use combination of nested Flows/Tasks to make it possible that we have reusable code that we can track on graph with desired granularity. If it won't be possible maybe logs will be enough for us, but then there is this problem with logger for DaskTaskRunner :wink:

So I'm just checking what's possible and what is not

marcin.grzybowski: Possibility of seeing details of flow on low level is really nice, but for that, as I understand, I need to use Flow->Taks->Flow->Task... combination

anna: not necessarily - the problem you see here is only when running a single subflow multiple times in parallel - if you don't run those in parallel, it works fine - this works and satisfies your modularity use case:

import asyncio
from prefect import flow

@flow 
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow 
async def main_flow():
    for _ in range(5):
        await subflow_1()

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

anna: <@ULVA73B9P> open "Running the same subflow concurrently multiple times raises RuntimeError("The task runner is already started!")"

Original thread can be found here.

anna-geller commented 2 years ago

The problem raised by the user:

Doing this sequentially works, only doing this concurrently doesn't

anna-geller commented 2 years ago

from the user - deepcopy works:

import asyncio
import copy
from prefect import flow

@flow
async def subflow_1(a):
    print("Subflow 1 started!" + a)
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1('a'), copy.deepcopy(subflow_1)('b')]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
thesubneo commented 2 years ago

unfortunately it works only for flows without tasks... Adding task causes error

import asyncio
import copy
from prefect import flow, get_run_logger, task

@task
def task1(a):
    get_run_logger().info("task 1 started!" + a)

@flow
async def subflow_1(a):
    get_run_logger().info("Subflow 1 started!" + a)
    task1(a)
    await asyncio.sleep(1)

@flow
async def main_flow():
    params = ['a', 'b', 'c']
    parallel_subflows = [copy.deepcopy(subflow_1)(param) for param in params]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Traceback (most recent call last):
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 229, in _handle_exception
    raise error
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlite3.OperationalError: database is locked
jacobdanovitch commented 2 years ago

I'm having a similar problem when trying to run a flow that calls the same subflow multiple times, which itself calls tasks. The outer flow generates and loops over a list of URLs, calling a subflow on each one that checks a cache, and if it's not found, fetches the URL and saves the result (each of those steps being a task).

I have Orion connected to Postgres so I don't get the SQLite issue shown above, but I do get errors like:

RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /home/ubuntu/.local/lib/python3.8/site-packages/anyio/from_thread.py:187> cb=[TaskGroup._spawn.<locals>.task_done() at /home/ubuntu/.local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:629]> got Future <Future pending> attached to a different loop

This can be evaded by making all of the tasks within a (sub-)flow async as well, but that's not ideal as the subflow's tasks (check cache => fetch => save) are all dependent so I'm just awaiting repeatedly. Ideally I could just asynchronously run the subflow over all the URLs, while running the tasks in each subflow run synchronously.

zanieb commented 2 years ago

This one is a bit complicated but is on our radar to resolve.

Benoss commented 2 years ago

Had a similar issue with "The task runner is already started!" but in my case that was on retry of a task that failed I think. Not exactly sure what was happening and very hard to pinpoint the exact issue as I had some parallel things running. Was not able to reproduce easily.

NoamGit commented 2 years ago

Getting the same issue. RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at ...> cb=[TaskGroup._spawn.<locals>.task_done() at /Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop

preceding with File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/ssl.py", line 888, in read v = self._sslobj.read(len) ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2633)

It works locally, but when using prefect cloud it fails

My code looks like

@flow(name="scraping flow")
async def scrape(start_date, future_months_to_scrape, ):
    credentials = get_credentials()
    secrets = get_secrets()
    scraper = Scraper(start_date=start_date
                              , future_months_to_scrape=future_months_to_scrape)
    login_status = await login(scraper, credentials)
    data = await fetch_data(scraper, login_status, wait_for=[login])
    close_state = await close_and_clean_session(scraper, wait_for=[data])
    await load_to_mongo(data, secrets, wait_for=[close_state])
    processed_data = translate_to_mysql_data_model(data)
    await load_to_mysql(processed_data, secrets)

if __name__ == '__main__':
    load_dotenv()
    flow_param = dict(start_date="01/09/2021", future_months_to_scrape=10)
    asyncio.run(scrape(**flow_param))
anna-geller commented 2 years ago

@NoamGit is it the same issue though? Which of those are subflows in your case? when it comes to Cloud, last Friday we deployed a new version, you may try to upgrade your Prefect client version and try running again with the Cloud backend.

NoamGit commented 2 years ago

Tried it in 2.0b8. No subflows, just async code @anna-geller .

I've created a minimum example (again running ok without prefect start or cloud)

import asyncio

from dotenv import load_dotenv
from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@task
def get_numbers():
    return [1, 2]

@flow(name='test-flow')
async def dummy_flow():
    numbers = get_numbers()
    await print_values(numbers)  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)

if __name__ == '__main__':
    load_dotenv()
    asyncio.run(dummy_flow())

This fails with

RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/from_thread.py:219> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/noam.cohen/Library/Caches/pypoetry/virtualenvs/finance-lGLYToYy-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop
zanieb commented 2 years ago

@NoamGit this is unrelated to the error reported in this thread, can you open a new issue?

jeffcarrico commented 2 years ago

I'm at the same point as @jacobdanovitch in https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1147655272 only I'm struggling to get my code to work by making everything async and awaiting. That is probably due to my own lack of understanding of asyncio so I'll keep looking at it.

I think what we're probably all looking for is a flow.map operator that "just works" instead of having to do the async plumbing, with the deepcopy(subflow), etc.

anna-geller commented 2 years ago

@jeffcarrico I wonder whether mapping over flows would solve it though since you would still need to concurrently start multiple runs of the same subflow

@madkinsz would mapping over subflows be even feasible? curious

zanieb commented 2 years ago

Yeah this is feasible and something we'll be exploring.

anna-geller commented 2 years ago

View in #show-us-what-you-got on Slack

Chris_L. @Chris_L.: Hello Prefect community. :wave: Wanted to share a small code snippet I've been using to resolve this open issue: https://github.com/PrefectHQ/prefect/issues/5853 It's not technically a "fix" as I am creating new functions in the namespace via currying :curry:, but I think it's a cleaner solution compared to the current hack of using "deepcopy". @Jacob_Danovitch @Jeff_Carrico @Anna_Geller Hope this helps others encountering this issue!

import asyncio
from prefect import task, flow

@task
async def print_x(x):
  print(x)
  await asyncio.sleep(1)

def build_subflow(name):
  @flow(name=f"subflow:{name}")
  async def subflow(x):
    await print_x(x)
  return subflow

@flow
async def parent_flow():
  futures = [build_subflow(name=x)(x) for x in ["a", "b", "c"]]
  await asyncio.gather(*futures)

GitHub: Running the same subflow concurrently multiple times raises `RuntimeError( · Issue #5853 · PrefectHQ/prefect

A interesting finding: this pattern works even if @flow(name=f"subflow:{name}") is replaced with just @flow. Prefect gives a warning "A flow named 'subflow'...conflicts with another flow. Consider specifying a unique name parameters in the flow definition", but this does not seem to prevent the curried subflows from running concurrently.

Anna_Geller @Anna_Geller: Thanks for sharing, nice workaround! I'll add it to the GitHub issue too.

When we rename flows, Prefect treats those as separate flows and will assign a different flow ID - that's why it works.

When doing:

import asyncio
from prefect import task, flow, get_run_logger

@task
async def print_x(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(x)
    await asyncio.sleep(1)

def build_subflow():
    @flow
    async def subflow(x):
        await print_x(x)

    return subflow

@flow
async def parent_flow():
    futures = [build_subflow()(x) for x in ["x1", "x2", "x3"]]
    await asyncio.gather(*futures)

if __name__ == "__main__":
    asyncio.run(parent_flow())

it also works, as you mentioned but with a warning, but this is still nicer as it keeps the same flow ID.

Thanks again, I appreciate that you dive deeper into this topic

milenkobeslic commented 2 years ago

I'm getting the same RuntimeError("The task runner is already started!") when trying to run two different subflows in parallel, but each calling the same databricks function jobs_runs_submit_and_wait_for_completion.

prefect 2.4.5 prefect-databricks 0.1.3

import asyncio
import json
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion

databricks_credentials = DatabricksCredentials.load("databricks")

@flow(name="databricks_job_1")
async def job_1():
    file = open("flows/databricks/job_1.json")
    tasks = json.load(file)["settings"]["tasks"]
    await jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job-1",
        tasks=tasks,
    )

@flow(name="databricks_job_2")
async def job_2():
    file = open("flows/databricks/job_2.json")
    tasks = json.load(file)["settings"]["tasks"]
    await jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job-2",
        tasks=tasks,
    )

@flow
async def async_flow():
    parallel_subflows = [job_1(), job_2()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(async_flow())

One subflow starts and keeps running but but the second fails immediately.

17:30:22.401 | INFO    | prefect.engine - Created flow run 'handsome-hog' for flow 'async-flow'
17:30:26.023 | INFO    | Flow run 'handsome-hog' - Created subflow run 'olivine-trout' for flow 'databricks_job_2'
17:30:26.406 | INFO    | Flow run 'handsome-hog' - Created subflow run 'hypnotic-bat' for flow 'databricks_job_1'
17:30:27.267 | INFO    | Flow run 'olivine-trout' - Created subflow run 'mysterious-ara' for flow 'Submit jobs runs and wait for completion'
17:30:28.229 | INFO    | Flow run 'mysterious-ara' - Created task run 'jobs_runs_submit-c00eee75-0' for task 'jobs_runs_submit'
17:30:28.230 | INFO    | Flow run 'mysterious-ara' - Submitted task run 'jobs_runs_submit-c00eee75-0' for execution.
17:30:28.910 | INFO    | Flow run 'hypnotic-bat' - Created subflow run 'silver-numbat' for flow 'Submit jobs runs and wait for completion'
17:30:28.913 | ERROR   | Flow run 'silver-numbat' - Crash detected! Execution was interrupted by an unexpected exception.
17:30:29.666 | ERROR   | Flow run 'hypnotic-bat' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/milenko/workspace/rb-prefect-master/venv/lib/python3.10/site-packages/prefect/engine.py", line 589, in orchestrate_flow_run
    result = await flow_call()
  File "/Users/milenko/workspace/rb-prefect-master/flows/async_demo.py", line 14, in job_1
    await jobs_runs_submit_and_wait_for_completion(
  File "/Users/milenko/workspace/rb-prefect-master/venv/lib/python3.10/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/milenko/workspace/rb-prefect-master/venv/lib/python3.10/site-packages/prefect/engine.py", line 489, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/Users/milenko/.pyenv/versions/3.10.5/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/Users/milenko/.pyenv/versions/3.10.5/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/Users/milenko/workspace/rb-prefect-master/venv/lib/python3.10/site-packages/prefect/task_runners.py", line 156, in start
    raise RuntimeError("The task runner is already started!")
RuntimeError: The task runner is already started!
zanieb commented 2 years ago

Replaced by https://github.com/PrefectHQ/prefect/issues/7319

jeffcarrico commented 2 years ago

@madkinsz it looks like #7319 is on track to resolve RuntimeError: The task runner is already started! in a clean way. I've opened another issue (#7322) to capture the discussion around subflow.map that started with the suggestion by @jacobdanovitch . Feel free to close it if it is redundant.

https://github.com/PrefectHQ/prefect/issues/5853#issuecomment-1147655272

Ideally I could just asynchronously run the subflow over all the URLs, while running the tasks in each subflow run synchronously.

captnced commented 1 month ago

hello community ! I'm using prefect==3.0.2 and there seem to be an issue again with running subflows concurrently

this code triggers 5 subflows that are being run sequentially:

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3)
)
async def main_flow():
    for _ in range(5):
        await subflow_1()

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

even when using workpool, workers and ad-hoc deployment, I still cannot have my subflows running in parallel:

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    log_prints=True,
)
async def master(names: list[str]):
    _r = map(main,names)
    r = await asyncio.gather(*_r)   
    print('master > ',r)
    return r

@task(
    log_prints=True
)
async def doit(name):
    print('task <',name)
    await asyncio.sleep(5)
    return f'hello {name} !!'

@flow(
    task_runner=ThreadPoolTaskRunner(max_workers=3),
    log_prints=True,
)
async def main(name):
    print('main <',name)
    _r = doit.submit(name)
    res = _r.result()
    print('main >',res)
    return res

if __name__ == "__main__":

    '''
        CREATE WORKPOOL + WORKERS + DEPLOY
    '''
    # prefect work-pool create --overwrite --type process wp1 --set-as-default
    # prefect worker start --pool wp1 --limit 5 --type process ( << run several of these)
    # prefect deploy /work/path/test_1.py:master --pool wp1 --work-queue default --concurrency-limit 10 --name dep1
    async def run_deployment(dep_id,names):
        async with get_client() as client:            
            flow_run = await client.create_flow_run_from_deployment(
                deployment_id=dp_id,
                parameters=dict(names=names),
                job_variables={'names':names}
            )
            print('flow run',flow_run)

    dp_id='c8cb8a2c-0427-4474-a9fc-583bcc78bf98'
    asyncio.run(run_deployment(dp_id,['charlie', 'bob', 'alice']))

am I doing anything wrong ?