delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.45k stars 1.67k forks source link

[Feature Request] Support auto compaction #815

Open kkr78 opened 2 years ago

kkr78 commented 2 years ago

We have a situation delta lake producing a lot of small files so trying to see if there is any way to instruct deltalake to merge the small files into large files. I came across this article,https://docs.databricks.com/delta/optimizations/auto-optimize.html, it appears the auto compact merges files and creates a large file w/ size 128MB.

does delta lake support auto compact? spark.databricks.delta.autoCompact.enabled=true

I tried in EMR 6 w/ io.delta:delta-core_2.12:1.0.0 but it's not working as expected

/usr/bin/spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=1 --queue=elead --conf spark.databricks.delta.autoCompact.minNumFiles=20 --conf spark.databricks.delta.autoCompact.enabled=true s3://.../incremental.py

zsxwing commented 2 years ago

OSS Delta Lake doesn't support auto compact. If you are using OSS Delta Lake, you should check https://docs.delta.io/ instead.

kkr78 commented 2 years ago

yes, we use OSS Delta lake. is there any way to reduce the number of files Deltalake produces w/ merge? is it possible to add autoCompact feature in OSS Deltalake?

zsxwing commented 2 years ago

We have a section to show how to compact files in OSS Delta manually: https://docs.delta.io/latest/best-practices.html#compact-files.

Open sourcing auto compact will need a lot of work and take time. You can also leave your comment in https://github.com/delta-io/delta/issues/748 to help us prioritize the popular features.

tdas commented 2 years ago

@kkr78 In addition to what @zsxwing said, have you tried using the repartition option with merge? https://docs.delta.io/latest/delta-update.html#performance-tuning

kkr78 commented 2 years ago

We are having issues with both partition and non partition tables. The performance of merge takes longer as the data grows. There are lots of tables(datasets) managed w/ delta lake in production. This issue is only observed on large tables. The small and medium tables are doing fine.

On one of the large delta tables, with more than 215 M records, with a current data size close to 70GB (7-day history close to 520GB), taking more than 5 hrs to merge the records. The incremental (CDC) data is a snapshot of changes captured from RDBMS, contains inserts, updates, and deletes. The typical incremental record size is very small, less than 1% of the dataset size. The incremental changes applied to delta table using delta lake merge. Similar performance degrade observed w/ other large tables as well.

Our environment is on EMR 6 w/ Spark 3.0 + w/ Deltal lake 1.0.

Tried to run delta compaction on the table, increase the memory and executors but none of them helped. One the table mentioned above, the data is partitioned by year. We thought partitioning by a year and a month might not help as the incremental changes span across several months in the current year.

Any inputs in improving the performance will greatly help.

chrimaho commented 2 years ago

Hi Team!

I like this idea, and I would like to take it one-step further.

Specifically, I'd like to set up a much easier way to optimize my delta tables from spark. I am referring to the spark sql command: OPTIMIZE <table>.

Let me show you the differences:

Current:

First, run:

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app').getOrCreate()
df = spark.createDataFrame(
    pd.DataFrame(
        {'a': [1,2,3,4],
         'b': ['a','b','c','d']}
    )
)
df.write.mode('overwrite').format('delta').save('./folder')

Then, once it's saved, run:

CREATE TABLE df USING DELTA LOCATION './folder'

Then, once the table is loaded, run:

OPTIMIZE df
--or
OPTIMIZE df ZORDER BY (b) 

As you can see, there are many steps needed.

Future:

I'd like to be able to do something like this:

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app').getOrCreate()
df = spark.createDataFrame(
    pd.DataFrame(
        {'a':[1,2,3,4],
         'b':['a','b','c','d']}
    )
)
df.write.mode('overwrite').format('delta').options(optimize=True).save('./folder')
#or
df.write.mode('overwrite').format('delta').options(optimize=True, zorder_by=('b')).save('./folder') 

As you can see, it's much more streamlined, and keeps the code to a higher-level.

Can we please add a way to make it possible to integrate this optimize or compaction change in to the write options? Or something similar like this?

Thank you.

References:

luistaka commented 1 year ago

@chrimaho I liked your idea about optimize delta tables. It's much easier.

Does anyone know if auto compact is now supported in OSS Delta lake? spark.databricks.delta.autoCompact.enabled=true spark.databricks.delta.autoCompact.maxFileSize=134217728

I am using AWS emr-6.10.0. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Delta-release-history.html