delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.24k stars 1.63k forks source link

[BUG] Parquet fragmentation on merge operations #1580

Open wesley-too opened 1 year ago

wesley-too commented 1 year ago

Bug

Describe the problem

In docs https://docs.delta.io/latest/delta-update.html#performance-tuning we have spark.databricks.delta.merge.repartitionBeforeWrite.enabled

This is enabled by default in Delta Lake 1.1 and above. To enable this in Delta Lake 1.0 and below set the Spark session configuration spark.databricks.delta.merge.repartitionBeforeWrite.enabled to true.

The problem is that the repartitions are not working and file fragmentation is an issue.

Steps to reproduce

I'm using EMR 6.8.0, and this example can be run on a notebook.

First Notebook Cell

%%configure -f 

{
    "conf": {
        "spark.jars.packages": "io.delta:delta-core_2.12:2.1.1,org.mongodb.spark:mongo-spark-connector:10.0.4",
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension"
    },
    "executorMemory": "16g",
        "executorCores": 8,
            "numExecutors": 1,
                "driverMemory": "12g"
}

The pyspark code

from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName('issue') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

spark.conf.set("spark.sql.legacy.timeParserPolicy", 'LEGACY')

data = [ (i, "John Wick") for i in range(1, 1000001) ]

df = spark.createDataFrame(data=data, schema=["id", "name"])

df.write.format("delta").mode("overwrite").save("s3://my_bucket/database=db/schema=workers/table=sample_data_1/")

"""
S3 Result
                           PRE _delta_log/
2023-01-31 19:26:10          0 _delta_log_$folder$
2023-01-31 19:26:20     502485 part-00000-23ae8370-a3b2-4758-b042-d8cacca6202a-c000.snappy.parquet
2023-01-31 19:26:20     501715 part-00001-eb68afb0-0d0c-40d9-ac78-207744dda87c-c000.snappy.parquet
2023-01-31 19:26:20     501718 part-00002-51f90481-88f4-4c57-a506-a2db2bc4c6c5-c000.snappy.parquet
2023-01-31 19:26:20     501710 part-00003-9b8208d4-8eff-4dac-9367-28cd1c0b0d10-c000.snappy.parquet
2023-01-31 19:26:20     501726 part-00004-0f85164e-3427-4523-8b15-2c156444ecac-c000.snappy.parquet
2023-01-31 19:26:20     501727 part-00005-45cb0fed-cb5e-4da8-8053-711dbd6c4621-c000.snappy.parquet
2023-01-31 19:26:20     501717 part-00006-00483a10-a682-4df4-9467-83eb349853af-c000.snappy.parquet
2023-01-31 19:26:20     504023 part-00007-a915a5d6-5351-4687-bf54-66a68e55eb14-c000.snappy.parquet

Total Objects: 9
   Total Size: 4016821
"""

delta_table = DeltaTable.forPath(spark, "s3://my_bucket/database=db/schema=workers/table=sample_data_1/")

df_2 = spark.createDataFrame(data=data, schema=["id", "name"])

delta_table.alias("delta_table").merge(
            source=df_2.alias("df2"),
            condition=expr("delta_table.id = df2.id")
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

"""
S3 Result
                           PRE _delta_log/
2023-01-31 19:26:10          0 _delta_log_$folder$
2023-01-31 19:26:20     502485 part-00000-23ae8370-a3b2-4758-b042-d8cacca6202a-c000.snappy.parquet
2023-01-31 19:29:14     161088 part-00000-aaf3cfc7-e3ef-42e3-8cca-3fbe34eb563d-c000.snappy.parquet
2023-01-31 19:29:14     164832 part-00001-24033c7b-0009-4983-9173-77098f03e1f0-c000.snappy.parquet
2023-01-31 19:26:20     501715 part-00001-eb68afb0-0d0c-40d9-ac78-207744dda87c-c000.snappy.parquet
2023-01-31 19:29:14     160850 part-00002-4af742c8-68f6-4936-8c38-b0375bc69b98-c000.snappy.parquet
2023-01-31 19:26:20     501718 part-00002-51f90481-88f4-4c57-a506-a2db2bc4c6c5-c000.snappy.parquet
2023-01-31 19:26:20     501710 part-00003-9b8208d4-8eff-4dac-9367-28cd1c0b0d10-c000.snappy.parquet
2023-01-31 19:29:14     161663 part-00003-bfbf17f3-8007-4908-ada3-2bcb16881012-c000.snappy.parquet
2023-01-31 19:26:20     501726 part-00004-0f85164e-3427-4523-8b15-2c156444ecac-c000.snappy.parquet
2023-01-31 19:29:14     164693 part-00004-49529d57-50bd-493b-8889-370f1b3d1ea7-c000.snappy.parquet
2023-01-31 19:29:14     163614 part-00005-19e323b4-2e71-44ec-8add-b0ac5c6caf40-c000.snappy.parquet
2023-01-31 19:26:20     501727 part-00005-45cb0fed-cb5e-4da8-8053-711dbd6c4621-c000.snappy.parquet
2023-01-31 19:26:20     501717 part-00006-00483a10-a682-4df4-9467-83eb349853af-c000.snappy.parquet
2023-01-31 19:29:14     164256 part-00006-076f1d53-c2b6-4ba4-8037-06750dc453c6-c000.snappy.parquet
2023-01-31 19:29:14     161443 part-00007-7a2ef758-40e5-4e6f-889f-2078f55e6fcb-c000.snappy.parquet
2023-01-31 19:26:20     504023 part-00007-a915a5d6-5351-4687-bf54-66a68e55eb14-c000.snappy.parquet
2023-01-31 19:29:14     164557 part-00008-d848ff95-b7d5-48f0-a27c-b8e70b6ddce8-c000.snappy.parquet
2023-01-31 19:29:14     164509 part-00009-56487200-c8f5-4ed6-91df-bca212d34af9-c000.snappy.parquet
2023-01-31 19:29:14     161384 part-00010-6ca9bffe-b1e0-40f9-830c-c2daeb8a69c4-c000.snappy.parquet
2023-01-31 19:29:14     161343 part-00011-ba9ac7ce-7ba0-4f13-8ad4-133dd565b92b-c000.snappy.parquet
2023-01-31 19:29:14     164548 part-00012-a0fc001a-0f30-4b44-afe6-fc3d4615cac5-c000.snappy.parquet
2023-01-31 19:29:14     161399 part-00013-a5551558-2f0a-48b3-8fef-43cb2205f32d-c000.snappy.parquet
2023-01-31 19:29:14     161107 part-00014-53f52ca1-66b6-4fbd-bb7c-3e125f669693-c000.snappy.parquet
2023-01-31 19:29:14     161409 part-00015-b8d7e134-8186-46ae-b6bb-164b9f9b76f1-c000.snappy.parquet
2023-01-31 19:29:14     164665 part-00016-cfb6a9d9-e01a-4063-abdb-d5598aed46b4-c000.snappy.parquet
2023-01-31 19:29:15     161402 part-00017-55c8303d-0bd0-4c2e-8858-fc154e977694-c000.snappy.parquet
2023-01-31 19:29:15     161302 part-00018-7418d8d9-c6e3-450e-8519-785cbfa29f9a-c000.snappy.parquet
2023-01-31 19:29:15     164875 part-00019-85c8a56c-5bb0-4512-91f0-34bb9c95c58b-c000.snappy.parquet
2023-01-31 19:29:15     164798 part-00020-2e5b4a8e-6b01-439c-9e29-d18a17a56a80-c000.snappy.parquet
2023-01-31 19:29:15     164776 part-00021-6461798b-171f-4a0f-b009-5f73a39da109-c000.snappy.parquet
2023-01-31 19:29:15     164654 part-00022-ae2ec4f7-aa69-45d8-858f-0526858f4e63-c000.snappy.parquet
2023-01-31 19:29:15     160882 part-00023-fd9d39bf-fa4e-44fc-8603-aef7fbe3ea74-c000.snappy.parquet
2023-01-31 19:29:15     164571 part-00024-64b5bd6c-bb74-4c6d-a705-cc90670e1fc9-c000.snappy.parquet
2023-01-31 19:29:15      52670 part-00025-b3b8df99-9550-4634-b63a-1b6a96632d5f-c000.snappy.parquet

Total Objects: 35
   Total Size: 8144111
"""

If we run multiple merges, small files will keep being created and not merged or reused.

In production, we have more than 40k small files that we are processing via Change Data Capture (CDC).

Observed results

This is resulting in a high cost for S3 API operations on the pipeline due to API calls for objects.

We tried to use optimization as well, but another problem showed up.

from delta.tables import *

gold_path_tmp = "s3://my_bucket/database=db/schema=workers/table=sample_data_1/"

deltaTable = DeltaTable.forPath(spark, gold_path_tmp)

deltaTable.optimize().executeCompaction()

We got an exception.

org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

It seems like it tried to use the S3 path as the table name in the Hive catalog instead of using the Delta table path?

Expected results

It is expected that the merge will reuse files and not keep the small fragmented files. (Or I'm wrong?)

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

Kimahriman commented 1 year ago

Looking at the code, that setting only applies if your table is partitioned, which in your example it isn't. Maybe would make sense to add something that handles the non-partitioned case? But I could see like repartition(1) also causing issues if that's a default thing

scottsand-db commented 1 year ago

As mentioned above, repartitionBeforeWrite won't do anything here since the table isn't partitioned. Consider using .repartition(x) as mentioned above as well as here: https://github.com/delta-io/delta/issues/239.

Please let me know if this helps to address your issue!