dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.65k stars 1.47k forks source link

Setting DataVersion in IO Managers #15584

Open roeap opened 1 year ago

roeap commented 1 year ago

What's the use case?

We are currently developing an IO manager for Delta Lakes. As such we have the table version information available once the data is written to the table. We would like to record that version for the output.

After searching a bot through the docs and code I saw no straight forward way to provide the DataVersion in that scenario. I tried using the set_data_version method on the step context, but that seemed to not yield the desired result, unless the version still gets hashed with the code version for display in the UI.

Ideas of implementation

Using set_data_version as described above would be fine - and maybe this ii already the way to go :).

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

OwenKephart commented 1 year ago

Hi @roeap ! It looks like currently, the data version cannot be manually set in any way other than setting the data_version on the Output() object: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/execution/plan/execute_step.py?L508

This is naturally at odds with your goal here, as the IOManager only gets invoked after the Output event is created. So in short, I think a change would be necessary here in order to use the data version that was set using set_data_version (cc @smackesey )

roeap commented 1 year ago

Thanks for the clarification @OwenKephart! Would this be something that may eventually land, or is this not an intended use?

On a related note, I have opened #14764 which contains a state of the mentioned IO manager. Do you know whom to ask if this would be a "worthy" contribution?

Disclaimer, I am one of the maintainers of delta-rs ...

sryza commented 1 year ago

@roeap thanks for the reminder about that PR. I just replied on it.

sryza commented 1 year ago

Thanks for the clarification @OwenKephart! Would this be something that may eventually land, or is this not an intended use?

We're eventually interested in supporting this functionality, but it's not high-priority for us at the moment. "Runtime data versions" – i.e. setting a data version at the time that an asset is materialized, instead of just based on its code_version and upstream data – is not the smoothest experience right now in Dagster. E.g. there's not yet a way to say "stop materializing downstream assets in this run, because the data version of the upstream asset is the same as its previous data version".

roeap commented 1 year ago

Thanks @sryza - makes sense given that this is also fairly new mechanics.

For now I think we can build a nice experience providing some helper functions to generate sensors that leverage the data versions known by delta.

There is a fairly considerable caveat anyhow, as I'd ideally like to also narrow version updates to the affected partitions, but there are some ideas using some manual log pruning ...

ion-elgreco commented 9 months ago

@sryza curious about this as well, working on a feature now for dagster-deltalake and I wanted to start linking the new table versions as dagster "data versions"

Any chance this could be picked up?

mjclarke94 commented 8 months ago

+1 to this. Having to violate DRY by writing a little hashing lambda for every asset if we want to decouple the data version and code version.