delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] Memory leak in a long running Spark Thrift Server with Delta Lake workloads #1359

Open dnskr opened 2 years ago

dnskr commented 2 years ago

Bug

Describe the problem

I'm facing memory leak in a long running Spark Thrift Server (STS) on Kubernetes if Delta Lake tables are used.

RSS (process) memory grows all the time while STS is being used to run workloads with Delta Lake tables. The growth correlates with the frequency of executing sql queries. After few hours, RSS reaches pod memory limit and kubelet kills the pod with 137 exit code and set OOMKilled to the pod status.

This default GCP graph demonstrates the issue:

Screen Shot 2022-08-23 at 23 43 15

In VisualVM I see "used memory" growing trend for heap memory, but it doesn't reach heap maximum. There is a graph snapshot made a few minutes before kubelet killed executor pod:

Screen Shot 2022-08-23 at 23 44 19

Also there is a growing trend for metaspace:

Screen Shot 2022-08-23 at 23 44 32

STS is launched on Kubernetes by executing sbin/start-thriftserver.sh on the pod with spark.master=k8s://https://kubernetes.default.svc. It runs specified number of executors with the following default memory configuration:

I have tried to increase memoryOverheadFactor from default 10% to 20%, 30% and 50%, but it didn't solve the issue, because RSS just has more space to grow, so kubelet kills it just a bit later.

If I change Delta Lake to Parquet, then STS works fine for many hours without visible memory leak:

Screen Shot 2022-08-21 at 11 41 43

The issue has been existing for more than a year, so it affects at least the following Spark+Delta bundles:

Steps to reproduce

  1. Start Spark Thrift Server in Kubernetes cluster by executing sbin/start-thriftserver.sh
  2. Continuously execute read/write/merge queries with Delta Lake tables
  3. Observe growing RSS and pod memory for executors:
    • ps o pid,rss -p $(pidof java)
    • kubectl top pod
    • other tools you have

Expected results

RSS memory should not continuously grow.

Further details

I have two STS instances running different queries, but both have the issue: STS_1

CREATE TEMPORARY VIEW ${source}
    USING JDBC
    ...
--
CREATE OR REPLACE TABLE ${table}
    USING DELTA
    ...
--
CREATE TABLE IF NOT EXISTS ${table}
    USING DELTA
    ...
--
MERGE INTO ${table}
    ...

STS_2

SELECT ... FROM ${table}
--
CREATE OR REPLACE TABLE ${table}
    USING DELTA
    ...

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

tdas commented 2 years ago

To continue our discussion from the (slack thread) do you have the heap live object histogram (even better heap dump) from the executor where this issue is happening?

dnskr commented 2 years ago

Yes, there are 7 histograms of executor memory from the beginning to a few minutes before OOMKilled memory-histograms.zip

zsxwing commented 2 years ago

@dnskr is the issue only happening in executors? Just checked the memory histograms you provided and looks like they were all from an executor.

dnskr commented 2 years ago

@zsxwing I observe this problem (visible memory leak) on executor pods only. I got the memory histograms above from one specific executor from start to a few minutes before OOMKilled. I'm not sure it helps, but there is one more executor pod memory screenshot showing the leak:

Screen Shot 2022-08-30 at 11 34 00
zsxwing commented 2 years ago

I saw the number of java.util.concurrent.ConcurrentHashMap$Node is pretty high (9.7m) but Delta doesn't use ConcurrentHashMap in executors. I'm wondering if you can do a jmap heap dump and check GC roots of these objects so that we can know where these ConcurrentHashMaps are used.

dnskr commented 2 years ago

I made two heap dumps of the executor memory which is quite close to be OOMKilled.

  1. All objects jmap -dump:format=b,file=executor_10.hprof 14

    executor_10
  2. Live objects jmap -dump:live,format=b,file=executor_10_live.hprof 14

    executor_10_live

@zsxwing Would you suggest any next steps?

zsxwing commented 2 years ago

Not sure if you have used Eclipse Memory Analyzer or not before. It provides a feature called Merge shortest path to GC Root ( https://stackoverflow.com/a/25942055/1038826 ) which can be used to find where these ConcurrentHashMaps are stored.

dnskr commented 2 years ago

Unfortunately I have never used it before, but what "merge shortest path to GC Root" (exclude weak references) shows

exclude weak references
zsxwing commented 2 years ago

@dnskr could you expend DeserializedMemoryEntry to see what is stored inside it?

dnskr commented 2 years ago

@zsxwing Yes, of course

DeserializedMemoryEntry
zsxwing commented 2 years ago

hm, this is this broadcast variable: https://github.com/delta-io/delta/blob/2041c3b7138b9614529540613195d4afd39fde71/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala#L103

However, these objects are supposed to be removed when Driver triggers a GC event to discard them. Do you set any spark.cleaner configs (config name that starts with spark.cleaner)? If not, could you try to set spark.cleaner.referenceTracking.blocking to false and see if this helps? The default value is true and the clean up speed may be too slow for queries triggered frequently.

dnskr commented 2 years ago

No, I didn't set any spark.cleaner.* properties before, so the default values are used.

I have set spark.cleaner.referenceTracking.blocking to false, but unfortunately it didn't help. There is the same memory leak graph as before. Screenshot of merge shortest path to GC Root" (exclude weak references) a few minutes before OOMKilled:

spark cleaner referenceTracking blocking to false
dnskr commented 2 years ago

@zsxwing The issue is reproducible on Spark 3.3.0 + Delta Lake 2.1.0 Also I read about memory leak in zstd which is used by default for event logs. So I have set spark.eventLog.enabled to false to check if zstd affects the memory leak but it didn't help. Do you have any other suggestions how can I try find the issue reason?

Kimahriman commented 2 years ago

Yeah the zstd thing is specifically within how Parquet uses it, and only really applies to reading Parquet files

zsxwing commented 2 years ago

@dnskr how long does it take before executors get OOMKilled? If it's less than 30 minutes, you can try to reduce spark.cleaner.periodicGC.interval (default: 30m) to a smaller value, such as 10min.

dnskr commented 2 years ago

@zsxwing It takes from ~2h to more for the highest memoryOverheadFactor.

zsxwing commented 2 years ago

hm, this looks unexpected. I would expect the driver should send RPCs to executors to clean up these objects. By the way, how many org.apache.hadoop.conf.Configuration objects in your heap dump? Just to confirm that the leak is actually from these objects.

dnskr commented 2 years ago

Heap dump ~20 minutes after start

Class Name                              |   Objects | Shallow Heap | Retained Heap
-----------------------------------------------------------------------------------
org.apache.hadoop.conf.Configuration    |       130 |        6,240 | >= 46,086,424
-----------------------------------------------------------------------------------

Heap dump ~10 minutes before OOMKilled

Class Name                              |   Objects | Shallow Heap | Retained Heap
-----------------------------------------------------------------------------------
org.apache.hadoop.conf.Configuration    |       235 |       11,280 | >= 85,261,720
-----------------------------------------------------------------------------------
zsxwing commented 2 years ago

hm, these numbers are not large. Can you go through other DeserializedMemoryEntrys in the heap dump and see if there are other large objects?

dnskr commented 2 years ago

I don't see anything else for DeserializedMemoryEntry objects

zsxwing commented 2 years ago

I meant the objects inside DeserializedMemoryEntry. Currently there are only 100 - 200 Configurations inside DeserializedMemoryEntry. What else in there?

dnskr commented 2 years ago

Inside DeserializedMemoryEntry I found:

  1. Four java.lang.Object[1]
    Type|Name|Value
    ---------------------------------------------------------------------------------
    ref |[0] |org.apache.spark.sql.execution.joins.UnsafeHashedRelation @ 0x6ef5c50c8
    ref |[0] |org.apache.spark.sql.execution.joins.LongHashedRelation @ 0x6ef672460
    ref |[0] |org.apache.spark.sql.execution.joins.UnsafeHashedRelation @ 0x6f015ebc0
    ref |[0] |org.apache.spark.sql.execution.joins.LongHashedRelation @ 0x6f630fe38
  2. Many (~60) byte[1][]:
    Type|Name|Value
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    ref |[0] |....sr..scala.Tuple2..f}[......L.._1t..Ljava/lang/Object;L.._2q.~..xpsr.%org.apache.spark.rdd.MapPartitionsRDD0..........Z..isFromBarrierZ..isOrderSensitiveL..evidence$2t..Lscala/reflect/ClassTag;L..ft..Lscala/Function3;L..partitionert..Lscala/Option;L..prevt..Lorg/apache/spark/rdd/RDD;xr..org.apache.spark.rdd.RDD2E.`.Q.g...Z..bitmap$0Z..checkpointAllMarkedAncestorsI..idL.._outputDeterministicLevelt..Lscala/Enumeration$Value;L..checkpointDataq.~..L..dependencies_t..Lscala/collection/Seq;L..evidence$1q.~..L..stateLockq.~..L..storageLevelt.'Lorg/apache/spark/storage/StorageLevel;xp......sr..scala.Enumeration$Val.ig....O...I..iL..namet..Ljava/lang/String;xr..scala.Enumeration$Valuebi|/.!.Q...L..$outert..Lscala/Enumeration;L..scala$Enumeration$$outerEnumq.~..xpsr.(org.apache.spark.rdd.DeterministicLevel$J#jMP......L..DETERMINATEq.~..L..INDETERMINATEq.~..L..UNORDEREDq.~..xr..scala.Enumerationu.....Y....I..nextIdI..scala$Enumeration$$bottomIdI..scala$Enumeration$$topIdL..ValueOrdering$modulet."Lscala/Enumeration$Va
    ref |[0] |....sr..scala.Tuple2..f}[......L.._1t..Ljava/lang/Object;L.._2q.~..xpsr.%org.apache.spark.rdd.MapPartitionsRDD0..........Z..isFromBarrierZ..isOrderSensitiveL..evidence$2t..Lscala/reflect/ClassTag;L..ft..Lscala/Function3;L..partitionert..Lscala/Option;L..prevt..Lorg/apache/spark/rdd/RDD;xr..org.apache.spark.rdd.RDD2E.`.Q.g...Z..bitmap$0Z..checkpointAllMarkedAncestorsI..idL.._outputDeterministicLevelt..Lscala/Enumeration$Value;L..checkpointDataq.~..L..dependencies_t..Lscala/collection/Seq;L..evidence$1q.~..L..stateLockq.~..L..storageLevelt.'Lorg/apache/spark/storage/StorageLevel;xp......sr..scala.Enumeration$Val.ig....O...I..iL..namet..Ljava/lang/String;xr..scala.Enumeration$Valuebi|/.!.Q...L..$outert..Lscala/Enumeration;L..scala$Enumeration$$outerEnumq.~..xpsr.(org.apache.spark.rdd.DeterministicLevel$J#jMP......L..DETERMINATEq.~..L..INDETERMINATEq.~..L..UNORDEREDq.~..xr..scala.Enumerationu.....Y....I..nextIdI..scala$Enumeration$$bottomIdI..scala$Enumeration$$topIdL..ValueOrdering$modulet."Lscala/Enumeration$Va
    ref |[0] |....sr..scala.Tuple2..f}[......L.._1t..Ljava/lang/Object;L.._2q.~..xpsr.%org.apache.spark.rdd.MapPartitionsRDD0..........Z..isFromBarrierZ..isOrderSensitiveL..evidence$2t..Lscala/reflect/ClassTag;L..ft..Lscala/Function3;L..partitionert..Lscala/Option;L..prevt..Lorg/apache/spark/rdd/RDD;xr..org.apache.spark.rdd.RDD2E.`.Q.g...Z..bitmap$0Z..checkpointAllMarkedAncestorsI..idL.._outputDeterministicLevelt..Lscala/Enumeration$Value;L..checkpointDataq.~..L..dependencies_t..Lscala/collection/Seq;L..evidence$1q.~..L..stateLockq.~..L..storageLevelt.'Lorg/apache/spark/storage/StorageLevel;xp.....3sr..scala.Enumeration$Val.ig....O...I..iL..namet..Ljava/lang/String;xr..scala.Enumeration$Valuebi|/.!.Q...L..$outert..Lscala/Enumeration;L..scala$Enumeration$$outerEnumq.~..xpsr.(org.apache.spark.rdd.DeterministicLevel$J#jMP......L..DETERMINATEq.~..L..INDETERMINATEq.~..L..UNORDEREDq.~..xr..scala.Enumerationu.....Y....I..nextIdI..scala$Enumeration$$bottomIdI..scala$Enumeration$$topIdL..ValueOrdering$modulet."Lscala/Enumeration$Va
zsxwing commented 2 years ago

Does your heap dump contain any sensitive data? I'm wondering if you can share the heap dump with me. This would speed up the investigation. It's hard for me to guess the issues right now.

dnskr commented 2 years ago

Unfortunately yes, there is sensitive data there. Actually I had the same idea to share heap dump with you. I'm going to reproduce the issue with public data (NY taxi or similar) next week, so I'll be able to share heap dump then.

zsxwing commented 2 years ago

Cool. You can slack me (Ryan Zhu) in Delta Users slack once you reproduce it and create the heap dump.

dnskr commented 2 years ago

@zsxwing Did you have a chance to look at the heap dump I provided? Is there anything else I can do to help to find the reason of the issue?

zsxwing commented 2 years ago

Sorry for the delay. I didn't find any clue. The heap memory seems normal. It's pretty small. Did you turn on spark.memory.offHeap.enabled?

dnskr commented 2 years ago

Sorry for the delay. I didn't find any clue. The heap memory seems normal. It's pretty small. Did you turn on spark.memory.offHeap.enabled?

No, I didn't set spark.memory.offHeap.enabled explicitly, so it should be default false value.

zsxwing commented 2 years ago

I'm almost running out of ideas. One more option may be worth to try: could you set spark.databricks.delta.snapshotCache.storageLevel to DISK_ONLY and see if the issue still exists? This can help us check whether it's caused by the RDD cache Delta is using today.

dnskr commented 2 years ago

I've just tested it on Spark 3.3.1 + Delta Lake 2.1.0 and also set spark.memory.offHeap.enabled to false and spark.databricks.delta.snapshotCache.storageLevel to DISK_ONLY, but the issue still exists.

Proofs (mostly for myself):

SET spark.memory.offHeap.enabled;
+----------------------------+-----+
|key                         |value|
+----------------------------+-----+
|spark.memory.offHeap.enabled|false|
+----------------------------+-----+

SET spark.databricks.delta.snapshotCache.storageLevel;
+-------------------------------------------------+---------+
|key                                              |value    |
+-------------------------------------------------+---------+
|spark.databricks.delta.snapshotCache.storageLevel|DISK_ONLY|
+-------------------------------------------------+---------+
Screenshot 2022-10-27 at 00 28 10
dnskr commented 1 year ago

The issue still exists on Spark 3.3.1 + Delta Lake 2.2.0

SachinJanani commented 1 year ago

@dnskr Did you got any solution for this issue? We are also seeing the same issue?

dnskr commented 1 year ago

@SachinJanani No, unfortunately I didn't find the solution yet.

dnskr commented 1 year ago

The issue still exists on Spark 3.3.2 + Delta Lake 2.2.0

SachinJanani commented 1 year ago

@dnskr I got this issue resolved by setting this conf --driver-java-options=-Ddelta.log.cacheSize=10 while starting the thrift server. I tested with lot of queries and its working fine. Also make sure to set spark.databricks.delta.snapshotCache.storageLevel=DISK_ONLY. Can you please run your test and do let me know if the above config solves this issue

dnskr commented 1 year ago

@SachinJanani It doesn't help. Memory still leaks on executors.

dnskr commented 1 year ago

The issue still exists on Spark 3.3.2 + Delta Lake 2.3.0

dnskr commented 1 year ago

The issue still exists on Spark 3.4.1 + Delta Lake 2.4.0

wangyum commented 3 months ago

@dnskr Is this issue still exist?

dnskr commented 3 months ago

@dnskr Is this issue still exist?

Unfortunately, I cannot check it on latest versions of Spark and Delta Lake, but the issue still exists on Spark 3.4.3 + Delta Lake 2.4.0.