delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.41k stars 1.66k forks source link

MergeInto is failing when trying to upsert 70GB data onto a 25TB Delta store #874

Open ganesh-ravi-srib opened 2 years ago

ganesh-ravi-srib commented 2 years ago

MergeInto is failing when trying to upsert 70GB data (size on S3) onto a 25TB Delta store (size on S3)

The failure is happening in the findTouchedFiles() spark stage and we see that our Driver is running out of memory before this stage can finish. We tried with a r5.24xlarge node (768GB RAM) for our driver (spark deploy mode = client). I think the reason is that the current Delta store has 25TB spread across 43,500 files and this Upsert is probably touching most of the files. So in this Spark stage containing 50k tasks, each of the tasks return about 7MB of data for the touchedFilesAccum and the driver is unable to hold this much data and is running OOM. On doing a Heap dump analysis of Driver, I see the following for class histogram:

image

We are using the following setup for our data pipeline:

EMR 5.33.0 Spark 2.4.7 Delta core 0.6.1 We have 8 r5.24xlarge machines as core nodes and 1 r5.24xlarge machine for the master node

Current Delta store size: 25 TB (after bootstrapping with initial data via a single commit) Upsert data size: 70 GB No partition used since our data has no logical partitioning and is a simple 5 column table with billions of rows

I even tried to do the same Upsert above with the following setup, but notice the same behavior:

EMR 6.4.0 Spark 3.1.2 Delta core 1.0.0 We have 8 r5.24xlarge machines as core nodes and 1 r5.24xlarge machine for the master node

Please suggest how we can proceed. One thing I am trying is to reduce the number of partitions (spark.sql.shuffle.partitions) so that the overall data returned to the Driver will be smaller, I haven't gotten this to succeed yet and I think this is not a scalable solution since reducing partitions means I need to use bigger executors. Please let me know if any further info is required.

tdas commented 2 years ago

This is definitely a hard problem to solve. Here are some initial thoughts.

My guess it that there is a very large overlap between the the 7MB worth of files names returned by tasks. After merging them across tasks, its not a lot of data. But internal data structures in Spark keep track of the touchedFilesAccum data of every task, so 50k tasks with 7MB of accum data each = at least 350 GB in memory. Probably 7MB is serialized... so post deserialization this blows up a lot more. Hence 768GB RAM isnt sufficient.

You are right that the most likely way to avoid this blowup is to reduce the number of tasks, say by 10x. Each task will still probably still return around 7-10 MB (because of large overlap of unique files seen by each task). You have a cluster of 8 r5.24xlarge machines = 768 cores. So properly use the cluster's parallelism, try running 5k tasks. Expected memory usage = 5k * 10MB = 50GB (vs 350GB calculated earlier). If that does not work because of too much memory spilling in the tasks, then up it 10k.

If you are still having trouble, here are the next steps. Can you give us more information on stage's metrics? Total sizes and distribution across tasks for task times, accum size, Shuffle size written and read, etc. Screenshots of the SQL plan showing all the metrics can help a lot.

ganesh-ravi-srib commented 2 years ago

I have gotten this to succeed for now with lesser number of shuffle partitions (so I will close this ticket). I was wondering if there is a reason why Accumulators were used to get this set of Touched files and not a pure Spark action.

tdas commented 2 years ago

I am glad you got it to work!

Accumulators were used originally because our initial testing had showed that accumulator to do the set was faster in finding the files than using the Spark job. But in hindsight, we clearly did not test it at the scale and settings that you are running it. Setting the shuffle partitions in using Apache Spark has always been a dark art, and many things can tip over like this when not configured correctly. Now that we are hitting this, we definitely want to improve this. Would you be interested in brainstorming on how to convert the accum into a spark action? and maybe contribute it?

ganesh-ravi-srib commented 2 years ago

Sure thing! I can do that. I am off for my holidays now though, shall we connect after Jan 1st when you're available? (I am based out of India btw)

tdas commented 2 years ago

Sure! happy to chat about this in the new year.

On Fri, Dec 24, 2021 at 10:11 AM ganesh-ravi-srib @.***> wrote:

Sure thing! I can do that. I am off for my holidays now though, shall we connect after Jan 1st when you're available? (I am based out of India btw)

— Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/issues/874#issuecomment-1000870241, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFB5LC4CD5PDWXTKLUJCBLUSSER7ANCNFSM5KNWLG4A . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you commented.Message ID: @.***>

ganesh-ravi-srib commented 2 years ago

Hi @tdas , I am back from my vacation. Please let me know when you want to chat about this

tdas commented 2 years ago

Alright! I am back from vacation as well. The challenge in this problem is that we want to run a single Spark job that computes 2 things

  1. The list of touched files
  2. The frequency/count of how many times a unique target line matches, so that we can check for duplicate matches and throw error.

The way the current code works is by use Dataframe operations to scan target files + join + groupBy + aggregate to do 2, and rely on accumulators to do 1. I think there may be a way to compute both 1 and 2 in the groupBy + aggregate.

Here is the code - https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L335-L366

Let's brainstorm about this together.

ganesh-ravi-srib commented 2 years ago

Hi @tdas , I have gone through that piece of code and now understand how touchedFilesAccumulator and the >1 target row count check works. Both of the above are currently being calculated from the same inner join without using any caching or persisting, the first using accumulators and the second using groupBy and aggregate.

I wonder how we could do the both in the same inner join without using a persist/cache. Please let me know your idea for this.

ganesh-ravi-srib commented 2 years ago

Actually maybe we could swap the two. In the sense, we could use Accumulators to find the duplicate target row matches and use Spark distinct to find the touched files. This will help because this way the data returned to the driver will be of much smaller size.

The Accumulator for finding duplicate target row matches can be a hashmap accumulator of type <Long, Boolean> where long represents the monotonically_increasing_id and the Boolean represents has_duplicates.

I am not entirely sure how Accumulators are internally implemented in Spark though. Considering the current touchedFilesAccumulator - will each task send the complete list of touched files to the Driver and the driver will add them to the hashset? Or will there be a hashset associated with each task and after the completion of the task this hashset will be sent to the driver and finally after the stage completes, the driver will merge all hashsets?

ganesh-ravi-srib commented 2 years ago

Hi @tdas , please let me know your view