delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG][Spark] Table created with Spark DataFrameWriter API has performance issues on merge #2484

Open aurokk opened 10 months ago

aurokk commented 10 months ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

Hey!

I'm facing a super weird behaviour of the merge command — a delta table created with Spark DataFrameWriter API has perfomance issues on merge and it seems really huge — merge into such table (1000+ seconds) 50-100 times slower than merge into a table created with DeltaTableBuilder API (just 10-15 seconds).

I have some observations, metrics, logs and reproducible demo. I attached everything below.

Btw, just to add a little more details about Spark DataFrameWriter API and DeltaTableBuilder API there are two snippets. I add the same date to both tables (.load(fake_target_path)).

1. This one will work very fast ```python ( DeltaTable.createIfNotExists(session) .addColumns(schema) .location(run_delta_api_target_path) .partitionedBy(partitioning_columns) .execute() ) ( session .read.format("delta") .load(fake_target_path) .write.format("delta") .mode("append") .save(run_delta_api_target_path) ) ```
2. This one will work slow af :) ```python ( session.read.format("delta") .load(fake_target_path) .write.format("delta") .mode("overwrite") .option("overwriteSchema", True) .partitionBy("_tenant") .save(run_spark_api_target_path) ) ```

We found the problem accidentally on production when recreated a table and got x20 performance boost. Now I'm looking for a way to test other tables whether they have the same problem or not.

Steps to reproduce

It is a bit hard to reproduce, so I created a separate repository with the code, the data and it is completely reproducible. Please, check this out. Its size is about ~120MB.

The app merges one table (source) into two other ones (target) created in two ways i mentioned above ways, and the app makes it 3 times to add some weight to the resutls :).

If you use macOS you can clone the repo and run the app this way:

  1. Install deps brew install java11 hadoop python@3.8
  2. Make venv python -m venv venv
  3. Install packages pip install -r requirements.txt
  4. Run tests pytest -s

I know that it is not very convenient, that i could use docker and so on, i can do that if it is really needed later.

Observed results

Merge into table created with Spark DataFrameWriter API takes 1000+ seconds. Merge into table creates with DeltaTableBuilder API takes 10-15 seconds.

Expected results

I expect merge on tables created either way to take the same amount of time.

Further details

Screenshot from grafana & logs You can see here that the spark application during merge into a table created with DataFrameWriter API just hangs and doesn't consume resources. Screenshot 2024-01-11 at 22 57 32 There are the logs. ``` DataMesh: INFO - Run: 0, api: delta api, time: 11.977291 DataMesh: INFO - Run: 0, api: spark api, time: 25.107108 <- idk why it was so fast the first time, usually it works slow even at the first time :-) DataMesh: INFO - Run: 1, api: delta api, time: 28.685594 DataMesh: INFO - Run: 1, api: spark api, time: 1027.720308 DataMesh: INFO - Run: 2, api: delta api, time: 14.089258 DataMesh: INFO - Run: 2, api: spark api, time: 1071.343948 ```
Screenshot from spark ui There u can see a screenshot from the spark ui, as u can see there are 2 tasks to go on a stage. Screenshot 2024-01-11 at 23 00 29 At the same time the spark application uses 2-3 cpu full cores, it correlates. Screenshot 2024-01-11 at 23 00 43
One more thing I also noticed that the tables have different history after creation. Table created using DataFrameWriter API has no CREATE TABLE entry in the history. DeltaTableBuilder API ``` +-------+-------------------+------+--------+------------+---------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ |version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo | +-------+-------------------+------+--------+------------+---------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ |0 |2024-01-11 19:31:06|NULL |NULL |CREATE TABLE|{isManaged -> false, description -> NULL, partitionBy -> ["_tenant"], properties -> {}}|NULL|NULL |NULL |NULL |Serializable |true |{} |NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0| |1 |2024-01-11 19:31:16|NULL |NULL |WRITE |{mode -> Append, partitionBy -> []} |NULL|NULL |NULL |0 |Serializable |true |{numFiles -> 652, numOutputRows -> 9275456, numOutputBytes -> 109237372}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0| +-------+-------------------+------+--------+------------+---------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ ``` DataFrameWriter API ``` +-------+-------------------+------+--------+---------+-----------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ |version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo | +-------+-------------------+------+--------+---------+-----------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ |0 |2024-01-11 19:31:26|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> ["_tenant"]}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 652, numOutputRows -> 9275456, numOutputBytes -> 109237372}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0| +-------+-------------------+------+--------+---------+-----------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------+------------+-----------------------------------+ ``` Looks weird too.

Environment information

I tested on spark 3.2.2 + delta 2.0.0, and spark 3.5.0 + delta 3.0.0 — there is no difference. Also there is no difference if you use s3 or local disc :)

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

Pshak-20000 commented 1 month ago

Hello,

I would like to express my willingness to contribute a fix for the bug in the Delta Lake code base. I can contribute a fix for this bug independently.

Thank you for the opportunity!

Best regards, Pragy Shukla