dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.3k stars 1.43k forks source link

multi-threaded / async executor #4041

Open yuhan opened 3 years ago

yuhan commented 3 years ago

Originated from https://github.com/dagster-io/dagster/issues/2268

Execute multiple ops simultaneously in the same process.

Prerequisites:


Message from the maintainers:

Excited about this feature? Give it a :thumbsup:. We factor engagement into prioritization.

markfickett commented 2 years ago

Lighter-weight ops sound appealing from the context of this Slack discussion. I have a batch job that was generating 10-20k dynamic outputs, which are pretty lightweight, and executing ~30 concurrently. Right now I'm executing on the multiprocessing backend, but eventually planning for k8s as a way to scale beyond one machine's capacity. Dagster was getting bogged down with that, but switching to chunking (~30 ops, each of which takes a list of inputs and runs a for loop) is a workaround for now, however I lose some efficiency due to unpredictable batch execution durations. I'm not exactly sure what a multi-threaded / async executor would mean, but if it allowed lighter weight op overhead, or some lightweight parallelism (async) within more heavyweight ops (k8s) that could be great!

rmg55 commented 2 years ago

Having an async and/or threaded executor would be really helpful for some of our use case. We need to download files from external ftp / http servers, and would like to ~100 (or more) concurrent ops executing.

motion-x86 commented 1 year ago

This is also extremely helpful when you want to share resources with all ops inside a give run. Think establishing connections with external systems to fetch resources these can have substantial overhead and would be of great help to be able to share the connections or resources with all ops inside a given run.

aaaaahaaaaa commented 7 months ago

We've been using Dagster for a while in production now, and we find the lack of multi-threaded executor very painful on a regular basis. The multi-process executor can result in really long cold start delays between steps, as well as large memory consumption when the max concurrency is high. And the in-process executor on the other hand essentially prevents concurrent orchestration. This is really surprising to me that this as never been higher on the roadmap of the Dagster team.

riziles commented 7 months ago

Just realized I can run async inside a multiasset. This is pretty useful. Might be worthwhile to add to docs?

from dagster import (
    AssetOut,
    multi_asset,
    asset,
    in_process_executor,
    define_asset_job,
    Definitions,
    AssetExecutionContext
)
import asyncio

async def slowreturn(var: str):
    await asyncio.sleep(5)
    return var

@multi_asset(
    outs={
        "x": AssetOut(group_name="g1"),
        "y": AssetOut(group_name="g1"),
    }
)
async def x_and_y():
    x1 = asyncio.create_task(slowreturn("something"))
    y1 = asyncio.create_task(slowreturn("else"))
    x2 = await x1
    y2 = await y1

    return x2, y2

@asset
async def x_plus_y(context:AssetExecutionContext, x,y):
    out = x + " " + y
    context.log.info(out)

r = define_asset_job("async_test", "*")

defs = Definitions(
    assets = [x_and_y, x_plus_y],
    jobs = [r],
    executor=in_process_executor
)
brentshulman-silkline commented 6 months ago

Would anyone be interested in collaborating on this together?

erenz14 commented 6 months ago

This issue contains a custom multi-threaded executor that someone created a while ago: https://github.com/dagster-io/dagster/issues/3177

Curious if this has any fundamental issues or could still be utilized?

brentshulman-silkline commented 6 months ago

This issue contains a custom multi-threaded executor that someone created a while ago: #3177

Curious if this has any fundamental issues or could still be utilized?

I actually tried to adapt this briefly. Wasn't super straightforward for for two reasons:

  1. Specific modifications for their lib
  2. Uses a pretty old version of dagster (still has "solids")

Definitley possible I would imagine, but beware - it's not plug and play

aaaaahaaaaa commented 6 months ago

The multiprocess executor spin off... processes. Which essentially, AFAICT, always loads the entire project definition, hence the issues mentioned above. It doesn't seem to me that anything based on the multiprocess executor will help.

OneCyrus commented 3 months ago

I just asked about support for multi-threaded inprocess executor in the dagster slack. unfortunately it isn't something which is on the roadmap for dagster at the moment.

https://dagster.slack.com/archives/C01U5LFUZJS/p1719904935167549

also i just had a brief look through the inprocess executor codebase but it's a bit hard to figure out how hard it would be to implement this. especially identifying all the thread unsafe parts.