kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
84 stars 76 forks source link

[spike] Clarify status of various Delta Table datasets #542

Open astrojuanlu opened 5 months ago

astrojuanlu commented 5 months ago

Description

We have several Delta Table datasets and it's a bit hard to understand the differences and also choose from them. A spike should be done to get clarity on what each of these delta table datasets does and how we could potentially merge the datasets or make them more consistent.

Context

image

(related: https://github.com/kedro-org/kedro/issues/3578)

Halfway through the implementation it was determined to not add a _save method and instead users are supposed to perform write operations inside the node themselves, introducing an undocumented exception to the Kedro philosophy.

This has been an ongoing point of confusion, and the final straw was this Slack thread https://linen-slack.kedro.org/t/16366189/tldr-is-there-a-suggested-pattern-for-converting-vanilla-par#4c007d9d-68c7-46e3-bf86-b4b86755f7ca

The expected outcomes here are unclear, but for starters I hope at least folks agree that the current situation is a bit of a mess. Possible actions are:

julio-cmdr commented 4 months ago

Hi! I've created a custom pandas.DeltaTableDataset, with upsert write mode. I'm sending it in here, just in case it's useful to anyone.

I took the opportunity to also add the pyarrow.Table.from_pandas() function within the _save() method, to fix the problems described in #3666

from deltalake.writer import write_deltalake
from deltalake import DeltaTable
from kedro_datasets.pandas import DeltaTableDataset
from kedro.io.core import DatasetError
from typing import List, Union, Any
import pandas as pd
import pyarrow as pa

# from https://github.com/inigohidalgo/prefect-polygon-etl/blob/main/delta-rs-etl/src/delta_rs_etl/upsert.py
def upsert(new_data: pa.Table, target_table: DeltaTable, primary_key: Union[str, List[str]]) -> dict:    
    predicate = (
        f"target.{primary_key} = source.{primary_key}"
        if type(primary_key) == str
        else " AND ".join([f"target.{col} = source.{col}" for col in primary_key])
    )

    return (
        target_table
        .merge(
            source=new_data,
            predicate=predicate,
            source_alias="source",
            target_alias="target"
        )
        .when_matched_update_all()
        .when_not_matched_insert_all()
        .execute()
    )

class CustomDeltaTableDataset(DeltaTableDataset):
    """
        This is a variation of pandas.DeltaTableDataset with support to upsert write mode
    """

    def __init__(self, primary_key: Union[str, List[str], None] = None, **kargs) -> None:
        self.primary_key = primary_key

        if kargs.get('save_args', {}).get('mode', '') == 'upsert':
            self.upsert_mode = True
            kargs['save_args']['mode'] = 'overwrite'
            if not self.primary_key:
                raise DatasetError(
                    "To use upsert write mode, you need to set the primare_key argument!"
                )
        else:
            self.upsert_mode = False

        super().__init__(**kargs)

    def _save(self, data: pd.DataFrame) -> None:
        data = pa.Table.from_pandas(data, preserve_index=False)

        if self.is_empty_dir:
            # first time creation of delta table
            write_deltalake(
                self._filepath,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )
            self.is_empty_dir = False
            self._delta_table = DeltaTable(
                table_uri=self._filepath,
                storage_options=self.fs_args,
                version=self._version,
            )
        elif self.upsert_mode:
            upsert(
                new_data=data,
                target_table=self._delta_table,
                primary_key=self.primary_key
            )
        else:            
            write_deltalake(
                self._delta_table,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )

    def _describe(self) -> dict[str, Any]:
        desc = super()._describe()
        desc['primary_key'] = self.primary_key

        if self.upsert_mode:
            desc['save_args']['mode'] = 'upsert'

        return desc
astrojuanlu commented 3 months ago

To add on top of that, just realized that months ago I had created my own Polars DeltaDataset: https://github.com/astrojuanlu/talk-kedro-huggingface/blob/b998371/src/social_summarizer/datasets/polars_delta.py

Mostly uninteresting, except for

        # HACK: If the table is empty, return an empty DataFrame
        try:
            return pl.read_delta(
                load_path, storage_options=self._storage_options, **self._load_args
            )
        except TableNotFoundError:
            return pl.DataFrame()

(related: https://github.com/kedro-org/kedro/issues/3578)