dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.78k stars 1.48k forks source link

dagster-embedded-elt - Allow to define group_name and other parameters for source assets #25426

Open gianfrancodemarco opened 1 month ago

gianfrancodemarco commented 1 month ago

What's the use case?

The sling_assets decorator defines assets based on the Sling replication config. It defines 2 assets for each stream, one representing the source and one representing the materializable asset. However, source assets are only defined as asset_specs, and the only customizable thing is the asset key. It would be useful to be able to set other parameters, such as the group_name

Ideas of implementation

The 'sling_assets' could explicit define the source assets allowing for deeper customization

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

cmpadden commented 3 weeks ago

Hi @gianfrancodemarco -

At the moment, to accomplish this you will need to define a SourceAsset with a key matching the upstream sources of your Sling assets. There, you will be able to assign properties like the group name. See this similar discussion for doing the same with dbt assets.

https://github.com/dagster-io/dagster/discussions/21626#discussioncomment-9306374

gianfrancodemarco commented 3 weeks ago

Hi @cmpadden, how would that work?

To my understanding the sling_assets decorator already created the source assets, so if I try something like:

my_asset = SourceAsset(
    key="my_source_asset", # matching the one created by the @sling_assets decorator
    group_name="my_group"
)

I get

dagster._core.errors.DagsterInvalidDefinitionError: Asset key AssetKey('my_source_asset') is defined multiple times. Definitions found in modules: xxx.assets.data_warehouse. 

What am I missing?

nixent commented 3 weeks ago

@gianfrancodemarco you can assign asset group by redefining source assets like it is done here

cloud_product_main_source_assets = [
    *[
        SourceAsset(key, group_name="postgres_main")
        for key in cloud_product_main_low_volume.dependency_keys
    ],
    *[
        SourceAsset(key, group_name="postgres_main")
        for key in cloud_product_main_high_volume.dependency_keys
    ],
    *[
        SourceAsset(key, group_name="postgres_main")
        for key in cloud_product_main_event_log.dependency_keys
    ],
    *[
        SourceAsset(key, group_name="postgres_main")
        for key in cloud_product_main_runs.dependency_keys
    ],
]

@cmpadden this is pretty common use case where you want to group source assets together to make assets graph more readable. dagster_sling_translator already provides get_deps_asset_key() and is lacking get_deps_asset_group() to make it complete. It will require changes to AssetSpec to make it more convenient to use instead of patching asset groups after their declaration.

gianfrancodemarco commented 2 weeks ago

@nixent that worked, thanks