delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.18k stars 1.62k forks source link

[QUESTION] Looking for Guidance on MERGE Performance Improvements #1790

Open Admolly opened 1 year ago

Admolly commented 1 year ago

We've just converted our entire database to Delta Lake tables and are now configuring our incremental update pipelines. All is well, for the most part, but we do have one table in particular which is taking a long time to MERGE. The table has around 40 million rows (10GB on disk), and around 100,000 of those are updated daily. The merge operation is taking over half an hour, which is longer than the amount of time it takes me to write the table to a parquet file. I am looking for some strategy to speed up this operation.

Some context: for most rows, only the updated_at column is updated. I was thinking about partitioning the table by some timestamp column, but the docs recommend not partitioning tables under 1TB. I also thought about doing an OPTIMIZE/ZORDER operation on the updated_at column, but spark threw the following error:

AnalysisException: Z-Ordering on [updated_at] will be
 ineffective, because we currently do not collect stats for these columns.
 You can disable
 this check by setting
 '%sql set spark.databricks.delta.optimize.zorder.checkStatsCollection.enabled = false'

Am I correct in assuming that only some partition or optimization operation will improve performance for MERGE, or is there some other strategy I might be overlooking? I've also read about ingestion time clustering, but since we loaded the table in a short series of batches I don't see that helping. Can someone please make some suggestions?

allisonport-db commented 1 year ago

@johanl-db Can you provide any insight?

PawaritL commented 1 year ago

hi @Admolly

feel free to share any more details on this table

mvalleavila commented 1 year ago

Hello @Admolly

Regarding the zOrder Analysis Exception, I faced the same issue and resolved it positioning the column at the first position of the delta table.

I use this piece of code before the first write to the Delta Table (where id_column_name would be your zOrderBy column)

to_delta_df = to_delta_df.select(id_column_name, *[c for c in to_delta_df.columns if c != id_column_name])

Anyway I'm having some troubles too using MERGE (with and without zOrdering), as with Spark 3.3.0 and Delta 2.3.0 the Driver memory is being exhausted. Using Spark 3.4.0 and Delta 2.4.0 resolve my issue.

@allisonport-db , @PawaritL do you know If I could use some configuration with Spark 3.3 and Delta 2.3.0 to get the same or similar MERGE behaviour as Delta 2.4.0.

Thanks in advance

Admolly commented 1 year ago

Hi @mvalleavila thank you for the head's up! I took a look at the release notes for Delta Lake 2.4 and I see that they have added support for deletion vectors which may have sped up your write performance. If that's the case it might be smart to re-write my MERGE operation as a delete and an insert rather than an update and an insert. Once I have tested the code on my machine I will share the results.

PawaritL commented 1 year ago

hi @Admolly, expanding further on @mvalleavila's answer: most likely what was happening was that your merge keys were probably not in the first 32 columns (32 is the default number of columns we collect statistics on, but you can specify a different number). Without collecting statistics on columns, Z-Ordering on those columns is ineffective.

I would try to re-order the columns first (making sure that important columns for joins and filter have statistics collected on them) and see if performance is satisfactory before re-writing the logic as two separate operations

@mvalleavila - for your question on the difference, it's currently hard to tell without seeing the query plans but feel free to connect with me if you need further help

Admolly commented 1 year ago

Hi @PawaritL the issue with the write/merge performance, I believe, is that the rows that are being updated are scattered throughout the parquet files. This necessitates a full re-write of each parquet file, regardless of how many rows in each file are being updated. If I understand correctly, deletion vectors circumvent this by logically marking individual rows for deletion rather than physically writing out new parquet files. Thus my strategy would be, instead of upserting the data with a MERGE, to delete the existing rows using the vectors and then insert both the new and updated rows in a separate operation. This strikes me as a more efficient approach given my predicament.

PawaritL commented 1 year ago

hi @Admolly, i also believe the rows are scattered across all files causing inefficient merges. however, I recommended fixing the data layout rather than changing the code for the following reasons:

  1. you might have complex matching/merging conditions that a single DELETE FROM ... WHERE ... statement can't easily fulfill
  2. merge performance on the sizes you mentioned (100k merge on 40m rows) should generally be quite fast even in older versions of Delta Lake and Spark. it's worth making sure the data layout is optimal not only for writes but also reads. in particular, it's worth making sure that all of your important columns (e.g. filtering/join keys) have statistics collected on them.

hope that makes sense but don't hesitate if you have any questions

Admolly commented 1 year ago

Hi @PawaritL I have made the changes you recommended and still I am not seeing any improvement in performance. The table has 190 columns, and we are collecting stats now on only 6 columns which I moved to the front of the table. The primary key is a composite key of 3 string columns, and the table has been partitioned by a 4th string column. Even using a much smaller subset of the table (roughly 1%) the write performance has not improved. I will try using only a subset of the columns to see if that improves anything.

PawaritL commented 1 year ago

hi @Admolly:

[just FYI in general] you can usually collect stats on at least 32 columns, you probably don't need to bring it down to only 6 columns.

Admolly commented 1 year ago

Sure thing! Here is the merge logic:

# merge source data into target table
deltaTable = SparkDeltaTable.forPath(spark, f"s3a://<bucket_name>/delta_tables/{table}")
deltaTable.alias('target') \
  .merge(
    source.alias('source'),
    'target.id = source.id AND target.database = source.database AND target.cluster = source.cluster'
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

As we have it now, we could include the partitioning column ('created_at_month' string column) in the join logic. The cluster and database columns are short strings with low cardinality (<= 10), the id column is a bit tricky. For around 80% of the data the id column takes an integer value, for the remaning 20% it uses the UUID data type, so unfortunately we need to coerce both to string data type. I am going to run ZORDER on the id column and see if that makes any difference at all.

PawaritL commented 1 year ago

many thanks! i'll reply soon. a few points in the meantime:

Admolly commented 1 year ago

I don't know if you saw my edit, but the id column has all been coerced to strings. There are two clusters of databases, the legacy one uses INT id's and the newer one uses UUIDs instead. Ideally we want these users in the same table if at all possible.

sezruby commented 12 months ago

Let me leave some general tip:

If matched rows are kind of random, then it would be hard to optimize it as it means MERGE needs to rewrite all files. I would suggest to check the statistics in delta log first.

Admolly commented 11 months ago

Hi @sezruby thank you for that very candid response. I was under the impression that might be the case. We did upgrade to 2.4 so we could try out the deletion vectors, but it did not improve performance as much as I was expecting it to. We did look into some partitioning strategies, which make a small improvement, but still not as much as we hoped for. Maybe we will revisit this at a later date.

sezruby commented 11 months ago

You can try adjust:

Merge is done by 2 joins, if your executor has enough memory for source data, try broadcast hash join. If your source data frame includes complex query & causing source materialization, it could also affect the performance.

felipepessoto commented 11 months ago

Hi @sezruby thank you for that very candid response. I was under the impression that might be the case. We did upgrade to 2.4 so we could try out the deletion vectors, but it did not improve performance as much as I was expecting it to. We did look into some partitioning strategies, which make a small improvement, but still not as much as we hoped for. Maybe we will revisit this at a later date.

MERGE is still not optimized for DV. You can try branch 2.5, you need to build the jar yourself. It includes some of the improvements described here: https://github.com/delta-io/delta/issues/1827