dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.77k stars 1.34k forks source link

Allow specifying PartitionMapping when non-asset jobs depend on partitioned source assets #13940

Open cleboo opened 1 year ago

cleboo commented 1 year ago

What's the use case?

I want to use the last partition of an asset as an input to a non-partitioned Job.

This can presently be done using an intermediate asset, however, that way a (potentially big) asset has to be materialized a second time.

Ideas of implementation

My rough idea of what this looks like from the users side would be something like this (using partition mappings):

from dagster import (
    asset,
    op,
    graph,
    DailyPartitionsDefinition,
    OpExecutionContext,
    In,
    AssetIn,
    LastPartitionMapping,
    Definitions
)

partition = DailyPartitionsDefinition(start_date="2023-04-19")

@asset(partitions_def=partition)
def val(context: OpExecutionContext) -> str:
    return context.asset_partition_key_for_output()

@op(ins={'s': In()})
def add_string(context, s):
    """add --added to the string and log it"""
    context.log.info(str(s))
    s = s + "--added"
    context.log.info(s)
    return s

@graph
def generate_val_string():
    return add_string(
        val.to_source_asset(partition_mapping=LastPartitionMapping())
    )

job = generate_date_string.to_job(
    "generate_last_date_string"
)

defs = Definitions(
    jobs=[job, ],
    assets=[val, ]
)

Additional information

Original discussion: https://github.com/dagster-io/dagster/discussions/13918

Related to https://github.com/dagster-io/dagster/issues/13357

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 1 year ago

I think the main question here is how exactly the PartitionMapping should be specified. I don't think it makes sense to make it an argument to AssetsDefinition.to_source_asset, because source assets themselves don't have a PartitionMapping. Also, what if you wanted to depend on something that's already a SourceAsset, not an AssetsDefinition?

Maybe something like:

@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01")
def emails():
    ...

@job
def my_job():
    send_emails(OpAssetInput(emails, partition_mapping=LastPartitionMapping()))
cleboo commented 1 year ago

Makes sense to me 🙂 That could also be the manual way to specify the partition mapping for that issue: https://github.com/dagster-io/dagster/issues/13357

dmsfabiano commented 10 months ago

@sryza @cleboo This would be awesome, would allow us to also dynamically trigger jobs that depend on specific partition via parameters (#16524)