dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.17k stars 1.4k forks source link

Support being downstream of multiple dynamic outputs #4364

Open alangenfeld opened 3 years ago

alangenfeld commented 3 years ago

Use Case

Ideas of Implementation

Currently the system is built around a single mapping_key which is guaranteed to correspond to the one upstream output.

The crux of this is to support multiple mapping_key s and which upstream they are associated with, either by explicitly storing a mapping using a deterministic ordering scheme.


Message from the maintainers:

Excited about this feature? Give it a :thumbsup:. We factor engagement into prioritization.

AlejandroUPC commented 2 years ago

Hello @alangenfeld is there a way to work around this while it's not implemented?

lgblkb commented 2 years ago

Any news in this direction?

alangenfeld commented 2 years ago

is there a way to work around this while it's not implemented?

sorry missed this while i was on parental leave. Some ideas for workarounds:

Any news in this direction?

Nothing at this time.

rubenbriones commented 1 year ago

Any news on that?

There is any workaround for the next example:

First I have one file with a graph that given a pd.DataFrame with countries info, and does the following:

  1. Checks if the countries passed are already registered in the DDBB
  2. Creates/inserts in the DDBB the counrteis not recognized
  3. Return a new pd.DataFrame including the country_id assigned in the DDBB for each country.
# countries.py

@op(
    ins={"countries": In(pd.DataFrame)},
    out={
        "matched": DynamicOut(),
        "unmatched": DynamicOut(),
    },
)
def match_countries(df_countries):
    for i, data in df_countries.iterrows():
        country_iso_code = data['iso_code']
        if is_country_in_ddbb(data):
            country_id = get_from_ddbb(country_iso_code)
            yield DynamicOutput(country_id, output_name="matched", mapping_key=country_iso_code)
        else:
            yield DynamicOutput(data.to_dict(), output_name="unmatched", mapping_key=country_iso_code)

@op
def create_country(data) -> int:
    country_id_created = insert_country_in_ddbb(data)
    return country_id_created

@op
def merge(matched, unmatched, countries) -> pd.DataFrame:
    country_ids =matched + unmatched
    countries['ids'] = country_ids
    return countries

@graph
def match_countries_graph(countries):
    matched, unmatched = match_countries(countries)
    new_country_ids = unmatched.map(create_country).collect()
    countries_matched = merge(matched.collect(), new_country_ids, countries)

And then I have a second file with the following logic:

  1. Get all the continents
  2. Read into a pd.DataFrame the countries for each continent
# continents.py

@op
def get_continents(context) -> List[DynamicOutput[str]]:
    continents = [...]
    res = [DynamicOutput(continent, mapping_key=continent) for continent in continents]
    return res

@op
def extract(context, continent) -> pd.DataFrame:
    df_countries = pd.read_csv(path + f'/{continent}.csv')
    return df_countries

@job
def my_job():
    get_continents().map(extract)

And what I want to do is applying the match_countries_graph for each pd.DataFrame mapped in my_job, somehting like that:

from .countries import match_countries_graph

@job
def my_job():
    get_continents().map(extract).map(match_countries_graph)

But I'm getting the next error: dagster._core.errors.DagsterInvalidDefinitionError: graph 'match_countries_graph' cannot be downstream of dynamic output "get_continents:result" since input "countries" maps to a node that is already downstream of another dynamic output. Nodes cannot be downstream of more than one dynamic output

So, there is any other way you think I can reorganize the code to achieve this functionality? Or can you tell me if you are planning to include this feature in next releases? is there any indicative date?

Thank you so much in advance.

clementemmanuel commented 1 year ago

Is there a sense of the complexity of implementing/supporting this? This seems like a fairly highly upvoted feature request i'm curious how that factors into the prioritization process for dagster.

It's not immediately obvious to me the implications and complexity of trying to implement this, if it's actually some-what reasonable I might be interested in taking a pass, though i'm kind of guessing it probably isn't based on the hesitation 😄 .

Fwiw this would make my life insanely nicer for some parameter optimization work i'm doing, so I am actually really interested in seeing this feature shipped.

elementljarredc commented 1 year ago

Hey Y'all. I've seen a few requests for updates on this issue. We're working on prioritizing these improvements alongside some improvements to how dynamic orchestration works with software-defined-assets. We don't have a delivery timeline yet, but might add one soon.

abhinavDhulipala commented 5 months ago

Hi everyone! I'd also like to advocate for a one to many to many relationship support. I have a use case where I enumerate a config that then each builds a binary and once again enumerates different runs. I'd love to use dagster and implementing this would make it such that I don't have to put spurious collect stages in between.

---
title: Run Regression
---
erDiagram
    enumerate_builds_op ||--o{ build_and_enumerate_run_op : "build bin"
    build_and_enumerate_run_op ||--o{ run_op : "run bin"
AgentK9 commented 6 days ago

Any updates here?

garethbrickman commented 4 days ago

@AgentK9 I don't think this is actively being worked on at this time