dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.69k stars 1.48k forks source link

Custom mapping between dynamic partitions #13139

Open bennyweise opened 1 year ago

bennyweise commented 1 year ago

What's the use case?

I have a case where I have a dynamically partitioned asset, where each partition is a piece of equipment at a particular location, and a second dynamically partitioned asset where each partition corresponds to the weather forecast for a given location. I would like to map the postcode-based forecasts to each piece of equipment, however the mapping won't be known until the equipment assets are loaded. I would like to be able to use this information to create a mapping that can be used in the downstream assets.

This was also discussed in slack

Ideas of implementation

A custom mapping could be created in an op / asset, that is then used in the partition mapping.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

clairelin135 commented 1 year ago

Thanks for filing this! We'll track this issue in our backlog.

One idea I had was accepting a user-defined function to map a dynamic partition to another, but this would force us to make a gRPC call every time to fetch upstream/downstream dynamic partitions which would cause dramatic slowdowns to asset reconciliation.

bennyweise commented 1 year ago

My relatively un-informed thoughts on this - for my use case, the relation between partitions in the map is very slowly changing, it just isn't know at definition time. It would be relatively straight forward to calculate the partition mapping as its own asset. Would it be possible to store the mapping as an asset and refer to this in the partition definition? That may limit the need to make so many gRPC calls, but that's well beyond my dagster knowledge.

sryza commented 1 year ago

If the framework had to invoke load_input to load the asset that stores the mapping, that would also require gRPC calls.

If it's very slowly changing, would it possibly make sense to manage it in a YAML file that gets deployed with the code and is used to construct a StaticPartitionMapping?

AlexanderVR commented 1 year ago

@clairelin135 I'm looking at this too, as reloading the k8s pod every time our StaticPartitions need to be updated is not ideal. I see three paths, in decreasing order of generality.

(1) With a user-defined function, I expect we can cache the upstream -> downstream and downstream -> upstream mappings just like the StaticPartitionMapping does. That way we only need a remote call at most whenever the dynamic mappings have changed, and then only when making upstream/downstream calls for those partitions that may have been affected.

(2) We serialize the user-defined function with marshal -- disallowing globals, non-builtin packages, etc. Might not solve the OP's use case in an ideal fashion since a location -> postcode mapper would probably like to use a 3rd party library. But it can solve the OP use case in the same way (3) does below. Another downside: it's hard to ensure the unmarshaled function works without running it.

(3) We restrict ourselves to partition mappings that are either an explicit coarsening or refining of a set of partitions. E.g. a location is tied to at most one postcode, so if we change the location partition keys to take the form location|postcode, then their cardinality remains unchanged and the mapping to postcode is just a projection onto the second "dimension". The difference between this and MultiPartitions is that here we allow ragged arrays, instead of enforcing that partitions exist for the full cross product.

All three of these would work for the use case I have in mind. I have some time to look at implementing too. If (1) is the preferred approach I'd appreciate any pointers for how to thread in the remote call. (2) is by far the easiest to implement -- I can put up a PR in short order if this is a decent short-term solution. (3) requires a little more API design -- should it expose just a library of partition key transformations such as the str.split('|')[1], or do we need a more formal DynamicRaggedPartitionsDefinition with explicit CoarseningPartitionMapping and RefiningPartitionMapping to or from a DynamicPartitionsDefinition?

drewsonne commented 12 months ago

I think I'm trying to do something similiar. I had a crack at implementing my own Partition definition, but I hit a problem with dagster._core.host_representation.external_data:external_partitions_definition_from_def(...) where dagster limits the types of partitions which can be used. Conceptually, I was thinking something like this...

from dagster import DynamicTreePartitionsDefinition, sensor, SensorResult, RunRequest
from typing import List, Tuple

# Order matters, so this is a tuple. This defines the first and second levels of the partition.
BrandProduct = Tuple[str, str]
partition_names: BrandProduct = ("brand", "product")

tree_partition_def = DynamicTreePartitionsDefinition(partition_names)

@sensor
def file_sensor(context):
    new_brands_for_product: List[BrandProduct] = []
    for product in context.resource.my_api.get_products():
        for brand in product.brands():
            if not context.instance.has_dynamic_partition(
                    tree_partition_def.name, (brand.name, product.name) # or {"brand": brand.name, "product": product.name} 
            ):
                new_brands_for_product.append((brand.name, product.name))

    return SensorResult(
        run_requests=[
            RunRequest(partition_key=product_brand) for product_brand in new_brands_for_product
        ],
        dynamic_partitions_requests=[
            tree_partition_def.build_add_request(new_brands_for_product)
        ],
    )

I don't have insight into the inner workings of dagster, and I'm assuming store tree structures in the DB is where things get complicated, but I would also find this very useful. This feels like quite a core aspect to how partitions operate which we mght be pushing up against. Happy to try and help contribute though, but depends on how much interest there is from the dagster team.

Our usecase is that we have a structure more or less like a brand which has a product, and those are not a time series, but definitions which change over time. So we want to periodically check for changes (using DataVersion), but be able to re-run aan asset job for a specific product/brand combination.

drewsonne commented 10 months ago

@sryza any update on this?

sryza commented 10 months ago

Hey @drewsonne - alas this is not on our near-term roadmap.

For your use case, would you be able to basically use f"{brand},{product}" as your string partition key, and then deserialize it into a NamedTuple in your code? Or is the idea that you want the Dagster UI to represent the hierarchy?

aoodham commented 10 months ago

I raised a similar request to this in slack. The only slight tweak to the above is that my "source of truth" in the relationship between dynamically partitioned assets is an external database. It's a simple parent -> child relationship, and it would be good to be able to tell dagster how to map between the two with reference to that.

Ultimately, I would like to be able to build diamond-shaped workflows wherein some assets are materialised at the parent level, then dependent properties computed per-child before again materialising some per-parent assets which depend on the set of results from the children. This diamond shape unfortunately makes the tuple approach not work as it would require re-computing the parent level partitions once per child.

RenaudLN commented 10 months ago

I also have a similar use case whereby one of my assets is a directed graph, then some downstream assets will do bottom-up aggregation, while other assets will cascade top-down.

drewsonne commented 9 months ago

Hey @drewsonne - alas this is not on our near-term roadmap.

For your use case, would you be able to basically use f"{brand},{product}" as your string partition key, and then deserialize it into a NamedTuple in your code? Or is the idea that you want the Dagster UI to represent the hierarchy?

Ideally, the Dagster UI would represent this. The solution of concatenation could be used for any partitioning including the existing partitioning functionality in Dagster, so having this represented in the UI would be the same use cases as you have for the existing partition functionality.

Daniel-Vetter-Coverwhale commented 6 months ago

I just wanted to throw out another use case/plus 1 comment to this.

We've got a set of connection ids, which each have vehicles associated with them, which then have trips, which then further have safety events.

My idea was to use dynamic partitions such that when we load the connection ids, that creates a dynamic partition which the vehicles use, and so on. This works pretty well in that we get the partitions we expect. However, I was also hoping there would be some way to see the mapping, and trigger the downstream partitions that were related to the upstream when the respective upstreams are refreshed. For instance, if a given connection had data quality issues at the source, then we need to trigger a run for all partitions of the downstream assets from that connection and it would be awesome if we could do that with an auto materialize policy and a custom mapping.

From a UI standpoint the MultiPartitionAsset looks nice, but then I realized it's a full cross product, which we don't want. It would be nice though if we could get that UI without the cross product, because then even if we weren't able to auto materialize the downstream partitions we would at least have a much easier time selecting them for manual update I think.

I am open to other suggestions on how to accomplish this. I think doing the pipe delimited partition keys with an asset sensor that does the downstream partition selection can work, but it feels a bit hacky, and the partition keys are going to get quite unwieldy as we go deeper. I don't think using an asset to make more assets actually works, as I'm not sure how they would get persisted into dagster definitions, but I could be totally wrong. That might be nice as all the mechanics are already in place for auto materializing, graphs, etc, but taking in multiple even as non-argument deps would also get unwieldy.