kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
94 stars 90 forks source link

Unable to save and load from versioned ManagedTableDatasets #920

Open jstammers opened 3 weeks ago

jstammers commented 3 weeks ago

Description

I am trying to make use of a versioned ManagedTableDataset so that I can correctly load and save using different versions of a delta table. I'm encountering an error when trying to load from a catalog, because the version is incorrectly configured for a delta table

Context

How has this bug affected you? What were you trying to accomplish?

Steps to Reproduce

conf = {'ds': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
  'versioned': True,
  'database': 'dev',
  'table': 'table',
  'write_mode': 'overwrite'},
 'ds_2': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
  'versioned': True,
  'database': 'dev',
  'table': 'table',
  'write_mode': 'overwrite'}}
catalog = DataCatalog.from_config(conf)
catalog.load("ds")

Expected Result

When creating these datasets, I expect that the load version numbers should be resolved from the current version, the specified version number or 0 if the table doesn't exist.

When calling ManagedTableDataset.save, the load and save version numbers should be incremented accordingly

ds = catalog.ds
ds_2 = catalog.ds_2

#check initial versions
assert ds.resolve_load_version() == 0
assert ds.resolve_save_version() == 1
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 1

ds.save(spark.createDataFrame(...))

#check ds has updated versions
assert ds.resolve_load_version() == 1
assert ds.resolve_save_version() == 2

#check load version of ds_2 is fixed and save version is updated
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 2

Actual Result

DatasetError: Failed while loading data from dataset ManagedTableDataset(database=dev, dataframe_type=spark, table=table, version=Version(load=None, save='2024-10-30T10.17.55.613Z'), write_mode=overwrite).
'>=' not supported between instances of 'NoneType' and 'int'

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-74f91739-2f89-4042-8eb7-77d385bb6dde/lib/python3.10/site-packages/kedro_datasets/databricks/_base_table_dataset.py:359, in BaseTableDataset._load(self)
    348 """Loads the version of data in the format defined in the init
    349 (spark|pandas dataframe).
    350 
   (...)
    357         in the format defined in the init.
    358 """
--> 359 if self._version and self._version.load >= 0:
    360     try:

TypeError: '>=' not supported between instances of 'NoneType' and 'int'

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

lrcouto commented 3 weeks ago

Hey @jstammers, thanks for raising this issue! The _load method used by ManagedTableDataset (inherited from BaseTableDataset) is making this comparison between None with 0 and causing the invalid type comparison you're seeing. While this is not addressed, you can try explicitly setting the initial version of your datasets to prevent it from defaulting to None.

noklam commented 3 weeks ago

@MinuraPunchihewa Do you have an idea?

I am actually confused that ManagedTableDataset is an AbstractVersionedDataset, which has resolve_load_version() etc. This is designed for file-based data, not for Table. The table versioning is possible with Delta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).

We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?

MinuraPunchihewa commented 3 weeks ago

@MinuraPunchihewa Do you have an idea?

I am actually confused that ManagedTableDataset is an AbstractVersionedDataset, which has resolve_load_version() etc. This is designed for file-based data, not for Table. The table versioning is possible with Delta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).

We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?

@noklam I am happy to take a look at this.

About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning? I would love an explanation on how exactly it is designed to be used.

jstammers commented 3 weeks ago

@noklam yes, the use case I have in mind is to be able to load the previous version of a delta table, so that I can perform some validation of the changes to the table after updating it.

As a pipeline, it would look something like


pipeline = Pipeline([
    node(update_table, inputs=["table", "staging_table"], outputs="updated_table"),
    node(validate_changes, inputs=["table", "updated_table"], outputs="changes")
    ])

where "table" and "updated_table" reference the same underlying delta table. When calling validate_changes, I expect "updated_table" to load from version n and "table" to load from version n-1

As for inferring the version number, I think the simplest way to do that is to use the following spark SQL statement

current_version = spark.sql("DESCRIBE HISTORY <catalog>.<database>.<table>").select("version").first()[0]
noklam commented 3 weeks ago

I'd be looking at some PoC to play with Iceberg and versioning and may come back to this a little bit.

@jstammers The other options is do this validation with hook instead of a node (nothing wrong with the current approach as well). How does the node generate the delta change? I see that the nodes has two inputs and split out the "changes" as output. Is this some kind of incremental pipeline?

noklam commented 3 weeks ago

About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning?

From my understanding, it was designed for filebase data. Version is a class that takes load_version and save version.

There are couple of requirements here:

  1. Version number need to be monotonic, as Kedro requires this to ensure the pipeline running with correct data. i.e. "resolve_load_version" should pick the latest file, right now it's assuming the latest timestamp.
  2. Optionally, this "load_version" can be specified with a --load-versions argument with kedro run per datasets.
    • This is a less important requirement, because worst case we can choose to not support arbitrary version via CLI for this specific dataset
  3. save_version comes from the session_id, which is the timestamp Kedro generates.

Take this example:

my_data:
   type: some.FileDataset
   path: my_folder/abc.file
   versioned: true

This is expected to save file as

my_folder/abc.file/timestamp1/abc.file
my_folder/abc.file/timestamp2/abc.file
my_folder/abc.file/timestamp3/abc.file

Noted that abc.file is both a folder name (slightly awkward), but also the file name. So the assumption here is, a file need to have a path (not necessary true for a Database table), and most table format has their own versioning scheme that was designed with much stronger feature sets (ACID for Delta/Iceberg).

Cc @merelcht