jaegertracing / spark-dependencies

Spark job for dependency links
http://jaegertracing.io/
Apache License 2.0
125 stars 70 forks source link

spark-dependencies OutOfMemoryError #106

Open RJ0222 opened 3 years ago

RJ0222 commented 3 years ago

Problem

How much memory does a spark-dependencies job take while handling about 12Gb data index?

I am totally new to the spark project and I have tried serval times to run a spark-dependencies job to create the DAG.

It always came with the error below even though I have adjusted the memory limit to about 28Gi.

21/02/03 08:18:39 ERROR TaskSetManager: Task 3 in stage 1.0 failed 1 times; aborting job
21/02/03 08:18:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 5, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 278220 ms
21/02/03 08:18:39 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 7, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 278220 ms
21/02/03 08:18:39 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 9, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 278220 ms
21/02/03 08:18:39 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 6, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 278220 ms
21/02/03 08:18:39 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_1_piece0 !
21/02/03 08:18:39 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(true)
21/02/03 08:18:39 WARN SparkContext: Killing executors is not supported by current scheduler.
21/02/03 08:18:39 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
    at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
    at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more
21/02/03 08:18:49 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 7,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at sun.reflect.GeneratedSerializationConstructorAccessor28.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.io.ObjectStreamClass.newInstance(ObjectStreamClass.java:1102)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2110)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:158)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:90)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
21/02/03 08:18:49 ERROR TaskSchedulerImpl: Ignoring update with state FAILED for TID 7 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.

Sometimes even a copyOfRange error occurs.

21/02/03 07:55:00 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.Arrays.copyOfRange(Arrays.java:3664)
    at java.lang.String.<init>(String.java:207)
    at java.lang.StringBuilder.toString(StringBuilder.java:407)
    at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3496)
    at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:3283)
    at java.io.ObjectInputStream.readString(ObjectInputStream.java:1962)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1607)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:158)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
21/02/03 07:55:00 WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 8, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 ERROR TaskSetManager: Task 3 in stage 1.0 failed 1 times; aborting job
21/02/03 07:55:00 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 5, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 7, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 9, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 6, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 148649 ms
21/02/03 07:55:00 WARN NettyRpcEnv: Ignored message: true
21/02/03 07:55:00 WARN SparkContext: Killing executors is not supported by current scheduler.
21/02/03 07:55:00 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_1_piece0 !

Environment

spark job configuration

javaOpts: -Xms12g -Xmx20g
resources:
  limits:
    cpu: "7"
    memory: 28Gi
  requests:
    cpu: "4"
    memory: 20Gi

ES data size

health status index                                  uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   jaeger--jaeger-span-2021-02-03         hhLqvs-5RT2xxxxxxxxx   5   1  193532073            0     11.4gb            6gb

Is there a way to solve this problem not by adding the memory limit? or it is just a usage problem that I have

Any suggestions or tips would be greatly appreciated.

lanhu-app commented 3 years ago

I have the same question. Is there any progress so far?

pavolloffay commented 3 years ago

Based on the previous experience it is just a matter of giving enough memory to spark.

@kidrocknroll were you able to solve the problem? How much memory did you allocate?

mindw commented 3 years ago

A possible workaround is to run the job more frequently on smaller chunks. With a ~15Gi daily span file, running the job every 4h works using the below spec.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  labels:
    app: jaeger
    component: spark-dependencies
  name: jaeger-spark-5a46
  namespace: jaeger
spec:
  concurrencyPolicy: Allow
  failedJobsHistoryLimit: 5
  jobTemplate:
    metadata:
    spec:
      template:
        metadata:
          labels:
            app: jaeger
            component: spark-dependencies
        spec:
          containers:
          - env:
            - name: STORAGE
              value: elasticsearch
            - name: ES_NODES
              value: *****
            - name: JAVA_OPTS
              value: -XX:MaxRAMPercentage=75.0
            - name: ES_TIME_RANGE
              value: 4h
            - name: ES_NODES_WAN_ONLY
              value: "true"
            image: *****/jaeger-spark-deps:0.0.1-2
            name: jaeger-spark-5a46
            resources:
              limits:
                cpu: "1"
                memory: 8Gi
              requests:
                cpu: 200m
                memory: 8Gi
          enableServiceLinks: false
  schedule: 15 */4 * * *
  startingDeadlineSeconds: 300
  successfulJobsHistoryLimit: 1
  suspend: false
rmntrvn commented 3 years ago

In my Kubernetes cluster, jaeger-spark starts every 8 hours. 1 starts at night, 2 during the day, 3 in the evening. Only the night job works successfully. In the Kubernetes cluster, taint / tolerations is configured for job jaeger-spark, that is, the pod is created only on the dedicated node.

The following resources have been allocated for the pod:

resources:
  limits:
    cpu: 8192m
    memory: 100Gi
  requests:
    cpu: 4096m
    memory: 100Gi

And the heap-size is as follows:

- name: JAVA_OPTS
  value: "-Xms100g -Xmx100g"

The span is around 200G, but OOM kills the pod. Span sizes and memory indicators in the screenshots. Can you please tell me what is the problem with this memory consumption? And how much memory is needed for the pod in this case? memory spans

Phil1602 commented 1 year ago

We encountered the same issue.

Using the latest container image (not available in DockerHub, but only in ghcr.io) fixed the issue for us. Maybe, because of JRE 11 instead of JRE 8, which uses +UseContainerSupport by default.

elixim commented 1 year ago

I've encountered the same problem. My technical environment is an Ubuntu Virtual Machine with 32g of ram and 250g of storage space. So I moved the direction of the temp files to part of the disk. My disk is divided into two parts. The second part is called data/ and it contains more than 80% of the 250 g of storage; so I have the /data/temp/ directory. So I assigned this directory to the variable local.dir". So we have: "spark.local.dir=/data/tmp". here's how I solved it:

I run spark with this configuration : pyspark --packages io.delta:delta-core_2.12:2.3.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf "spark.executor.instances=10" \ --conf "spark.driver.memory=32g" \ --conf "spark.executor.memory=32g" \ --conf "spark.memory.fraction=0.9" \ --conf "spark.executor.heartbeatInterval=30s" \ --conf "spark.network.timeout=600s" \ --conf "spark.task.maxFailures=10" \ --conf "spark.sql.files.maxPartitionBytes=512m" \ --conf "spark.sql.debug.maxToStringFields=1000" \ --conf "spark.sql.parquet.int96RebaseModeInWrite=LEGACY" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M" \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M" \ --conf "spark.network.timeout=300s" \ --conf "spark.driver.cores=8" \ --conf "spark.local.dir=/data/tmp"

And In my jupyter notebook configuration, I run this configuration:

Spark configuration

spark_conf = pyspark.SparkConf() \ .setAppName("myApp") \ .set("spark.driver.maxResultSize", "32g") \ .set("spark.sql.debug.maxToStringFields", "1000") \ .set("spark.jars", "postgresql-42.6.0.jar") \ .set("spark.driver.extraClassPath", "./postgresql-42.6.0.jar") \ .set("spark.sql.autoBroadcastJoinThreshold", "-1") \ .set("spark.ui.showConsoleProgress", "false") \ .set("spark.executor.memoryOverhead", "600") \ .set("spark.executor.heartbeatInterval", "120s") \ .set("spark.sql.adaptive.enabled", "true") \ .set("spark.sql.adaptive.skewJoin.enabled", "true") \ .set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3") \ .set("spark.memory.fraction", "0.9") \ .set("spark.driver.memory", "32g") \ .set("spark.executor.memory", "32g") \ .set("spark.task.maxFailures", "10") \ .set("spark.sql.files.maxPartitionBytes", "512m") \ .set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY") \ .set("spark.rpc.numRetries", "5") \ .set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M") \ .set("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M") \

Et Dans ma configuration du notebook jupyter, je lance cette configuration:

Configuration de Spark

pyspark --packages io.delta:delta-core_2.12:2.3.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf "spark.executor.instances=10" \ --conf "spark.driver.memory=32g" \ --conf "spark.executor.memory=32g" \ --conf "spark.memory.fraction=0.9" \ --conf "spark.executor.heartbeatInterval=30s" \ --conf "spark.network.timeout=600s" \ --conf "spark.task.maxFailures=10" \ --conf "spark.sql.files.maxPartitionBytes=512m" \ --conf "spark.sql.debug.maxToStringFields=1000" \ --conf "spark.sql.parquet.int96RebaseModeInWrite=LEGACY" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M" \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=32M" \ --conf "spark.network.timeout=300s" \ --conf "spark.driver.cores=8" \ --conf "spark.local.dir=/data/tmp"

sergeykad commented 4 days ago

Are there any plans to optimize resource usage? I am unable to process 20GB spans with 128GB of memory.