pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

Support merge operation for Delta tables #11983

Closed edgBR closed 5 months ago

edgBR commented 7 months ago

Description

Hi,

Lets ditch spark, once for all. Just kidding, now seriously, delta-rs new release allows for update, delete and merge operations in the python bindings.

I have checked that polars.DataFrame.write_delta() supports different write modes (append, overwrite etc...) but there is not way to merge data incrementally. It would be amazing to have this.

stinodego commented 7 months ago

@ion-elgreco is working on this. We are blocked by https://github.com/delta-io/delta-rs/pull/1602

ion-elgreco commented 7 months ago

@ion-elgreco is working on this. We are blocked by delta-io/delta-rs#1602

I think this is a different request. @edgBR wants to see support for delta operations inside Polars but I don't see how this would work. The only thing you need is your polars df into pyarrow format.

Also important to note MERGE currently does not work with large arrow types. This should probably work once logical plans are build instead of directly a physical plan for the MERGE operation. So I think you always will need to manually convert your polars df to arrow.

Regarding the PyArrow 13, I have looked at it but haven't figured out yet how to fix it quickly. So, I think we'll have to wait until @wjones127 has some time to look at it.

In the meantime, if you want to write polars dataframes with arrow large_dtypes like this:

import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake

df = pl.DataFrame() # your df 

table = df.with_columns(cs.datetime().cast(pl.Datetime(time_unit='us')).to_arrow()

write_deltalake(table, mode='append', large_dtypes=True)
edgBR commented 7 months ago

Hi @ion-elgreco so your suggestion is:

Is this correct?

At the end I am basically trying to refactor some pyspark code into polars + deltalake. I have a library that works for both streaming data and batch files but in my company I am the only one with basic pyspark expertise so I have decided to go with an option with higher chances of adaptation, the code is as follows:

from delta.tables import DeltaTable
from pyspark.sql.columns import Column as col
class DataLoader:
...
  def read_data():
    df = (self.spark.readStream
    .format(self.format) #delta format
    .options(**self.options)) # ignoreChanges:true is needed
    df = df.load()
  return df
  def merge_delta_data(input_df : DataFrame,
                                      batchId: int,
                                      catalog_name: str,
                                      db_name: str,
                                      table_name: str,
                                      merge_condition: str,
                                      partition_column: str,
                                      row_number_partition_by_cols: list,
                                      row_number_ordered_column):
    spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled"," true")
    spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "true")
    fqn_table = f"{catalog_name}.{db_name}.{table_name}"
    if (spark.jsparkSession.catalog().tableExists(fqn_table)):
      deltaTable=DeltaTable.forName(spark, fqn_table)
      deltaTable.alias("tgt").merge(
      (input_df.select("*").F.row_number().over(
      Window.partitionBy(row_number_partition_by_cols).
      orderBy(row_number_ordered_column))
      .alias("rownum")))
      .where("rownum==1").alias("src"),
      merge_condition) \
      .whenMatchedUpdateAll() \ #if condition matches we updated
      .whenNotMatchedInsertAll() \ #if these are new rows we just insert
      execute()
    )
    )
  else:
  ### we create delta table here if this doesnt exist
  ...
  def process_silver_data():
    self.read_data()
    return (df.WriteStream.format("delta")
    .outputMode("append")
    .queryName("Incremental Merge")
    .foreachBatch(lambda dfx, epochId: self.merge_delta_data(...))
    .trigger(availableNow=True) #when process finish, cluster shut downs
    ### this is equivalent to trigger(once=True) in old spark versions
    .option("checkpointLocation", self.checkpoint_location))
....
merge_condition = """src.processed_date = tgt.processed_data and
                                    src.entity_code = tgt.entity_code
                                    src.debtor_id = tgt.debtor_id
                                    src.case_id = tgt.case_id
                                    src.portfolio_id = tgt.portfolio_id
                                    src.snapshot_datetime > tgt.snapshot_datetime"""
row_number_partition_by_cols = ["country", "portfolio_id"]
row_number_ordered_column = col("processed_datetime").desc() #pyspark.sql.column.Columns
df = DataLoader().process_silver_data()
ion-elgreco commented 7 months ago

@edgBR that's correct!

edgBR commented 7 months ago

Got it,

Do you think then we should keep this open? I mean from my side it would be AWESOME to do this directly from polars, but I leave it up to you guys.

ion-elgreco commented 7 months ago

Got it,

Do you think then we should keep this open? I mean from my side it would be AWESOME to do this directly from polars, but I leave it up to you guys.

We could maybe add on Polars dataframe this method: DataFrame.merge_delta('table_path', df_alias = 'source', target_alias='target', predicate = "sql query format")

This would then return the deltalake class TableMerger where you can add all the when clauses.

@stinodego what do you think?

edgBR commented 7 months ago

Hi @ion-elgreco

This might be a good suggestion always that the mergeschema option gets added as well to the deltalake TableMerger class in future. Do you know if someone is working on this? Maybe we could ask what are the plans.

ion-elgreco commented 7 months ago

Hi @ion-elgreco

This might be a good suggestion always that the mergeschema option gets added as well to the deltalake TableMerger class in future. Do you know if someone is working on this? Maybe we could ask what are the plans.

I don't know of anyone working on it. I have a PR open to allow for additional when clauses in Python Merge.

I would suggest creating an issue for schema evolution in merge in delta-rs repo

stinodego commented 7 months ago

Hm, so it seems I misunderstood the original request. I thought you were calling for an update to DataFrame.write_delta.

It's still not clear to me exactly what you want. Could you provide explain what exactly you want to achieve and add a code example of what the Polars syntax would look like?

edgBR commented 7 months ago

Hi @stinodego

Pseudo code below, with a new method called merge_deltalake():

import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake

table_target = table_source = pl.DataFrame({
   'date': [datetime(2022, 1, 1),
           datetime(2022, 1, 2),
           datetime(2022, 1, 3)],
   'max_temperature': [32.6,29.3, 30.4]
})

table_target.write_delta(target='./my_delta_table', mode='append')

table_source = pl.DataFrame({
   'date': [datetime(2022, 1, 3),
           datetime(2022, 1, 4),
           datetime(2022, 1, 5)],
   'max_temperature': [33.2,34, 35]
})

table_target = pl.read_delta(source='my_delta_table')

table_target.merge_deltalake(source=table_source, 
                             merge_condition="src.date==tgt.date", 
                             source_alias="src", 
                             target_alias="tgt",
                             schema_evolution="autoMerge") \
                             .when_matched_update_all() \
                             .when_not_matched_insert_all())

Or updating write_delta():

import polars as pl
import polars.selectors as cs
from deltalake import write_deltalake

table_target = table_source = pl.DataFrame({
   'date': [datetime(2022, 1, 1),
           datetime(2022, 1, 2),
           datetime(2022, 1, 3)],
   'max_temperature': [32.6,29.3, 30.4]
})

table_target.write_delta(target='./my_delta_table', mode='append')

table_source = pl.DataFrame({
   'date': [datetime(2022, 1, 3),
           datetime(2022, 1, 4),
           datetime(2022, 1, 5)],
   'max_temperature': [33.2,34, 35]
})

table_target = pl.read_delta(source='my_delta_table')

table_target.write_delta(source=table_source,
                             mode="merge",
                             delta_write_options={"merge_condition" : "src.date==tgt.date", 
                             "source_alias":"src", 
                             "target_alias":"tgt",
                             "schema_evolution":"autoMerge") \
                             .when_matched_update_all() \
                             .when_not_matched_insert_all())

And of course similar functionality for when_not_matched_delete etc....

@stinodego let me know if I managed to explain my self properly and thanks for the support.

stinodego commented 7 months ago

It would make sense to me to update write_delta with merge functionality. A separate merge_delta doesn't fit in the Polars API.

But since it looks like you have both DataFrames in memory, why not do the required computation using the Polars API, and then do a write_delta(mode="overwrite")?

ion-elgreco commented 7 months ago

@stinodego but it's not just a write operation, also then if it's merge you need to add more inputs and then we have a function that serves 2 purposes.

With write it doesn't return anything but with merge it returns delta lake class TableMerger

stinodego commented 7 months ago

What's the TableMerger used for? It's not clear from the code example above.

ion-elgreco commented 7 months ago

What's the TableMerger used for? It's not clear from the code example above.

TableMerger contains a set of class methods that allow you define how records should be inserted, updated, or deleted. https://delta-io.github.io/delta-rs/python/api_reference.html#deltalake.table.TableMerger

I have some better examples here: https://delta.io/blog/2023-10-22-delta-rs-python-v0.12.0/

stinodego commented 6 months ago

@ion-elgreco Thanks, that blog post really clarifies it for me.

For UPDATE and DELETE, I think the deltalake options are actually fine. I see no reason to support this in Polars - these do not have anything to do with DataFrames. Users can just use the deltalake package for this and use the excellent DeltaTable interface.

For the merge operation, the current workflow clearly isn't ideal, having to go through Arrow.

What would make sense to me is to add additional functionality to write_delta under mode="merge".

This would not return a TableMerger object. Rather, it would internally configure the TableMerger object through parameters passed to write_delta, and call execute on it. The configuration can be done through the existing parameter delta_write_options or possible a separate one delta_merge_options.

Does this make sense?

ion-elgreco commented 6 months ago

@stinodego hmm I think that could be possible, however we would have to allow users to provide inputs for multiple when calls.

So I recently added the ability to do .when_matched_update for example multiple times which means you can combine multiple if then calls if you pass a predicates.

Let me look into this on Sunday :)

Feel free to assign it to me! 😄

stinodego commented 6 months ago

Let me look into this on Sunday :)

Great! This was just a suggestion - if you figure out something else that works better, I'm open for it. Curious to see what you come up with.

ion-elgreco commented 6 months ago

While sleeping it over I see two ways on doing everything within write_delta, however I still prefer option 3 which pushes the user to do the when_X themselves, I find this personally easier to read then nested dictionaries as function inputs.

Option 1: key value args for function and their inputs

The user needs to provide the function call als key and then then the function parameters as key value but in list format, since it is possible to do multiple when calls (introduced in this PR: https://github.com/delta-io/delta-rs/pull/1750).

df = pl.DataFrame({"id":['1', '2'], "foo":['hello :)', 'Good bye']})

delta_merge_options = {
     'predicate':'s.id = t.id', 
     "source_alias": 's',
      'target_alias': 't',
    "when_matched_update": {
        "predicate": ["s.id = t.id", "s.id_2 = t.id2"],
        "updates": [{ "s.id":"t.id", "s.foo":"t.foo"}, {"s.id2":"t.id2", "s.bar":"t.bar"}]
    },
    "when_not_matched_insert_all": {
        "predicate": "s.id > 5"
    }
}
df.write_delta('TEST_TABLE', mode='merge', delta_merge_options=delta_merge_options)

Option 2: positional args

Requires user to know the order of the inputs

delta_merge_options = {
     'predicate':'s.id = t.id', 
     "source_alias": 's',
      'target_alias':'t',
    "when_matched_update": [({"s.id":"t.id", "s.foo":"t.foo"}, "s.id = t.id"), 
                            ({"s.id2":"t.id2", "s.bar":"t.bar"}, "s.id_2 = t.id2")],
    "when_not_matched_insert_all": ("s.id > 5")
}
df.write_delta('TEST_TABLE', mode='merge', delta_merge_options=delta_merge_options)

Option 3: chain method calls

delta_merge_options = {'predicate':'s.id = t.id', "source_alias": 's', 'target_alias': 't'}
(
    df
    .write_delta(
        'TEST_TABLE', 
        mode='merge', 
        delta_merge_options=delta_merge_options
        )
    .when_matched_update(updates={"s.id":"t.id", "s.foo":"t.foo"}, predicate="s.id = 1")
    .when_matched_update(updates={"s.id":"t.id", "s.bar":"t.bar"}, predicate="s.id > 1")
    .when_not_matched_insert_all()
    .execute()
)

@stinodego Let me know what you think! 😄

edgBR commented 6 months ago

Hi @ion-elgreco

Of course I am biased against 3 as I proposed:


table_target.write_delta(source=table_source,
                             mode="merge",
                             delta_write_options={"merge_condition" : "src.date==tgt.date", 
                             "source_alias":"src", 
                             "target_alias":"tgt",
                             "schema_evolution":"autoMerge") \
                             .when_matched_update_all() \
                             .when_not_matched_insert_all())

But I think if you are coming from pyspark this would increase adoption. Also option 1 and 2 seems to me a bit funky and easy to mess up with closing "" and dicts.

Also as you mentioned to return the TableMerger isnt it Option3 the easiest one to implement? Please bear in mind that I'm ages away from your knowledge in how deltalake is built internally.

BR E

stinodego commented 6 months ago

Let's go with option 3. And we should make sure that mode is a Literal and that the appropriate typing overloads are in place.

I initially hesitated on returning a non-Polars object in our API, but then again we do the same in to_pandas.

Do we need a separate delta_merge_options parameter or would it be sensible for delta_write_options to do double duty here? The separate parameter is probably better.

ion-elgreco commented 6 months ago

Let's go with option 3. And we should make sure that mode is a Literal and that the appropriate typing overloads are in place.

I initially hesitated on returning a non-Polars object in our API, but then again we do the same in to_pandas.

Do we need a separate delta_merge_options parameter or would it be sensible for delta_write_options to do double duty here? The separate parameter is probably better.

Great :) I will then also add good examples of the behaviors!

I also like the separate one better, otherwise it implies those write parameters could be similar to the merge parameters.

edgBR commented 5 months ago

Hi @ion-elgreco and @stinodego I know that this is closed and still I have not adapted my code to the new version but I have been able to rewrite mostly everything for batch files:

landing to bronze:

    def landing_to_bronze(self) -> None:
        """
        This method converts parquet files into append only delta tables.
        """
        try:
            self.logger.info(f"Reading raw landing file {self.args.landing_file_name}")

            df = (
                pl.read_parquet(source=f"""abfss://{self.landing_container}/
                                {self.args.landing_file_name}""", 
                                storage_options=self.storage_credentials)
                .with_columns(insertion_time=datetime.now())
            )
            ## schema enforcement and data quality checks here

            self.logger.info(f"Reading raw landing file {self.args.landing_file_name}")

            df.write_delta(
                    target=f'abfss://{self.bronze_container}/{self.bronze_table}',
                    mode='append',
                    storage_options=self.storage_credentials
                )
        except Exception as e:
            self.logger.error(f"Failed to append file {self.args.landing_file_name} to append layer")
            raise e

    def bronze_to_silver(self) -> None:
        """
        This method merges incrementally the data to the silver table it also
        performs deduplication.

        Raises
        ------
        e
            A generic error explaining why the merge failed.
        """
        try: 
            bronze_df = pl.read_delta(
                source=f'abfss://{self.bronze_container}/{self.bronze_table}', 
                storage_options=self.storage_credentials)
                bronze_df_no_duplicates = (
                bronze_df
                .with_columns(pl.col("insertion_time")
                              .rank("ordinal", descending=True)
                              .over('payment_id')
                              .alias('rownumber'))
                .sort(pl.col('rownumber'))
                .filter(pl.col('rownumber')==1)
                )
            silver_check = self._table_checker(container=self.silver_table, 
                                            table_name=self.silver_table)
            if silver_check:
                self.logger.info("Merging new data into silver")
                silver_df = DeltaTable(
                    table_uri=f'abfss://{self.silver_container}/{self.silver_table}', 
                    storage_options=self.storage_credentials
                )
                (
                    silver_df.merge(
                        source=bronze_df_no_duplicates.to_arrow(),
                        predicate="s.primary_key = t.primary_key and s.insertion_time > t.insertion_time",
                        source_alias="s",
                        target_alias="t",
                    )
                    .when_matched_update_all()
                    .when_not_matched_insert_all()
                    .execute()
                )
                self.logger.info("Optimizing by Z order for silver table")
                silver_df.optimize.z_order(['primary_key'])
                self.logger.info(f'History of operations: {silver_df.get_add_actions().to_pandas()}')
            else:
                self.logger.info("Because silver table is empty we save the first bronze file as silver")
                bronze_df.write_delta(
                    target=f'abfss://{self.silver_container}/{self.silver_table}', 
                    mode='append', 
                    storage_options=self.storage_credentials
                )
        except Exception as e:
            self.logger.error(f"Failed to merge {self.args.landing_file_name} to silver table")
            raise e