PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.34k stars 1.59k forks source link

Add `Flow.submit` interface for subflows #6689

Open zanieb opened 2 years ago

zanieb commented 2 years ago

First check

Prefect Version

2.x

Describe the current behavior

Currently, subflows can only be run in the main process and only tasks can be submitted to other infrastructure and run in the background.

Describe the proposed behavior

Flows should be submittable to run in the background. Calling Flow.submit should return a future that can be used to wait for the flow run's completion.

As a stretch and perhaps future goal: Flows should be submittable to external infrastructure without a deployment.

All of the features available for flow calls should be available for submission. We may limit this in the first iteration to get the feature in user's hands faster.

Example Use

from prefect import flow

@flow
def bar():
   return 1

@flow
def foo():
    future = bar.submit()  # run in a worker thread
    x = future.result()

As a stretch goal:

from prefect import flow

@flow(infrastructure=KubernetesJob(...))
def bar():
   return 1

@flow
def foo():
    future = bar.submit()
    x = future.result()

Additional context

An extension of https://github.com/PrefectHQ/prefect/issues/6688

This feature will require expert changes to Prefect internals.

For in a background thread:

For external infrastructure:

polivbr commented 2 years ago

100% yes.

We have use cases where part of a flow needs to be run on Windows. Ideally we could set up a separate work queue and submit the subflow to this queue and have it be picked up by agents running on the Windows boxes, but there currently isn't a way to do this (at least not that I've found).

zanieb commented 2 years ago

@polivbr This interface would likely avoid the use of agents, I believe your use-case is addressed by run_deployment (#7047).

anna-geller commented 1 year ago

cc @anticorrelator - tagging you because I know you wanted to write a raft doc about it

use cases we should consider:

from datetime import date
from prefect import flow
from flows.transformation.jaffle_shop.dbt_run_from_manifest import dbt_jaffle_shop
from flows.ingestion.ingest_jaffle_shop import raw_data_jaffle_shop
from flows.analytics.dashboards import dashboards
from flows.ml.sales_forecast import sales_forecast, run_critical_work

@flow
def jaffle_shop_ingest_transform(
    start_date: date = date(2022, 11, 1),  # parametrized for backfills
    end_date: date = date.today(),
    dataset_size: int = 10_000,
):
    raw_data_jaffle_shop(start_date, end_date, dataset_size)
    dbt_jaffle_shop()
    dashboards()
    sales_forecast()
    run_critical_work()

I want to run the dashboards and sales_forecast subflows concurrently and I want to run run_critical_work only when those two subflows are complete, even if they fail. Currently, if any subflow run fails, run_critical_work cannot be executed. This would be possible with the allow_failure annotation added in PR 7120 but it's only applicable to tasks.

anna-geller commented 1 year ago

another user reporting a similar request: https://discourse.prefect.io/t/running-subflows-in-parallel-with-downstream-dependency/1903/2

for this user, this is something that makes migration difficult

zanieb commented 1 year ago

@anna-geller

Currently, if any subflow run fails, run_critical_work cannot be executed.

If you run your subflows with return_state=True failed states will not be raised and a downstream can still be run.

I agree we should definitely mirror all the task futures here though.

kpweiler commented 1 year ago

If/When this implemented - will it require the use of async def on the subflow functions (and then, transitively, the task functions)?

zanieb commented 1 year ago

@kpweiler no, this would be available as a sync and async interface.

jmesterh commented 1 year ago

FWIW I am trying to build a POC of a 10,000 table ETL using Prefect and just ran into this issue. The first attempt looked like this (using task tags to set concurrency limits):

@task
def extract()...
@task
def transform()...
@task
def load()...

@flow
def main():
  loads = []
  # ETL 10,000 tables in parallel 
  for table in tables:
    extracted = extract.submit(table)
    transform = transform.submit(extracted)
    loads.append(load.submit(transform))
  for load in loads:
    result = load.result()
    print("load complete for "+result)

This works, but it creates 30,000 tasks waiting for their dependencies to complete. What I would like to do is call each task sequentially (per table) while processing the tables in parallel:

@task
def extract()...
@task
def transform()...
@task
def load()...

@flow
def etl(table):
    return load(transform(extract(table)))

@flow
def main()
  loads = []
  # ETL 10,000 tables in parallel
  for table in tables:
    loads.append(etl.submit(table))
  for load in loads:
    result = load.result()
    print("load complete for "+result)

I see there is an async workaround, but I can't use async as there is no async driver for Oracle or MSSQL in SQLAlchemy. There is also the issue of not being able to run multiple instances of the same sub-flow. If these could be fixed so that the example above works that would be awesome.

rsampaths16 commented 1 year ago

only tasks can be submitted to external infrastructure and run in the background

Hi @madkinsz can you please provide example for this. I tried looking in the docs but couldn't find a way to do this. Thanks.

daniel-oh-sunpower commented 1 year ago

Just a gentle question: is this feature still in active development? If not, anyone else interested in seeing this implemented? (please up-vote if so!)

nicolasiltis commented 1 year ago

I'm very interested in seeing this feature implemented !! It would be awesome for the orchestration. Just adding an idea here and open the discussion : We have TaskRunner, why can't we have FlowRunner in the same way ? (SequentialFlowRunner[default], ConcurrentFlowRunner, DaskFlowRunner, RayFlowRunner, ...)

ricosaurus commented 1 year ago

Chiming in that it would be great to know if this is still under development -- for me it is currently a major obstacle for using prefect. I had naively assumed I could run subflows in parallel (without async) and am considering next steps.

williamjamir commented 12 months ago

I'm excited about this feature too! It would make the Prefect API more consistent, in my view, since flows and tasks will share common functionalities; big win for simplicity.

My user case is because I want to run flows with different parameters, like hyperparameters. It can make experiment executions more simple :)

wakatara commented 9 months ago

Have to echo @ricosaurus here, I had assumed being able to concurrently run a subflow (mine just takes a list of files to be processed and then submits them to a flow) up to the concurrency limit for the "subflow" it submits would have been fairly trivial, so right now this is completely blocking me migratig over to Prefect from Airflow (which I'd like to migrate from, but which supports concurrency in-DAGs.).

Can we understand when we may see this? I feel Prefect is the superior product and like the way it works much better than AF (also feel you can modularly code),but no parallel execution of sub/flows is kinda a deal killer.

Can we get an update on where this is as a feature release? Or if it is already supported but just not clearly documented?

snowdrop4 commented 8 months ago

The docs on flows, I feel, are quite misleading. They make it seem like parallel execution of sub-flows is already supported:

Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately

The above makes it sound like subflows behave like normal flows (literally). But really, they aren't comparable, as you can launch multiple normal flows and have them run in parallel, but this isn't the case for subflows.

Subflows will block execution of the parent flow until completion. However, asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather.

And then the above makes it sound like all you need to do is use asyncio.gather, and then the sub-flows will run in parallel. This also isn't true.

kevingrismore commented 8 months ago

@snowdrop4 I think this is a fair point. Docs will be updated in the next release to word that section slightly differently. In-process subflow runs can be concurrent and take advantage of async execution, but are not parallel in the way separate, simultaneous deployment runs are.

snowdrop4 commented 8 months ago

Thank you :pray:

thomasfrederikhoeck commented 7 months ago

Hi @kevingrismore . Is this something which is actively worked on?

daviddwlee84 commented 6 months ago

In my scenario, I have to process different date/item's data. Each day/item is independent and can be run parallel. And a single-day task is too complex to be a single task. (exists some sequential dependency, parallel calculation, cache for middle results, etc.) In this scenario, submitting a subflow is required for running these subflows parallel. https://github.com/PrefectHQ/prefect/issues/7322 https://github.com/PrefectHQ/prefect/discussions/12563

glesperance commented 3 months ago

This feature would go a long way in making prefect more intuitive to adopt. Moreover, since we can't run multiple flows at the same time one has to wrap them in dummy tasks to achieve paralellism.