apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
654 stars 121 forks source link

TPC-DS causes OOM #594

Open vaibhawvipul opened 1 week ago

vaibhawvipul commented 1 week ago

Describe the bug

initially started with a 3TB dataset, which i then scalled to 200GB. This is the driver and executor config on my end.

  --conf spark.driver.memory=10g \
  --conf spark.executor.cores=4 \
  --conf spark.executor.memory=12g \
  --conf spark.driver.memoryOverhead=3000 \
  --conf spark.executor.memoryOverhead=4G \

java options

--conf spark.driver.defaultJavaOptions="-XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70" \

--conf spark.executor.defaultJavaOptions="-verbose:gc -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70" \

Comet configurations are as described in the benchmark section website. Running this with 40 executors, and observe some OOM, which is intriguing because the dataset is small.

Steps to reproduce

No response

Expected behavior

No OOM

Additional context

image

vaibhawvipul commented 1 week ago

And then the driver hangs with the following:

2024-06-25 14:36:22,219 WARN serde.QueryPlanSerde: Comet does not guarantee correct results for cast from DecimalType(22,6) to DecimalType(32,6) with timezone Some(Etc/UTC) and evalMode LEGACY                                                                                                                                                                         
   2024-06-25 14:43:32,069 ERROR util.Utils: Uncaught exception in thread kubernetes-executor-pod-polling-sync                                                                                                                                                                                                                                                              
   io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.                                                                                                                                                                                                                                                                                           
       at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:129)                                                                                                                                                                                                                                                       
       at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:122)                                                                                                                                                                                                                                                       
       at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:543)                                                                                                                                                                                                                                                               
       at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.list(BaseOperation.java:427)                                                                                                                                                                                                                                                                              
       at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.list(BaseOperation.java:392)                                                                                                                                                                                                                                                                              
       at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.list(BaseOperation.java:93)                                                                                                                                                                                                                                                                               
       at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsPollingSnapshotSource$PollRunnable.$anonfun$run$1(ExecutorPodsPollingSnapshotSource.scala:91)                                                                                                                                                                                                                  
       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)                                                                                                                                                                                                                                                                                                
       at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsPollingSnapshotSource$PollRunnable.run(ExecutorPodsPollingSnapshotSource.scala:74)                                                                                                                                                                                                                             
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)                                                                                                                                                                                                                                                                                           
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)                                                                                                                                                                                                                                                                                                  
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)                                                                                                                                                                                                                                             
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)                                                                                                                                                                                                                                                    
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)                                                                                                                                                                                                                                                                                   
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)                                                                                                                                                                                                                                                                                   
       at java.lang.Thread.run(Thread.java:750)                                                                                                                                                                                                                                                                                                                             
   Caused by: java.util.concurrent.TimeoutException                                                                                                                                                                                                                                                                                                                         
       at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)                                                                                                                                                                                                                                                                                      
       at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)                                                                                                                                                                                                                                                                                           
       at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:520) 
lmouhib commented 1 week ago

logs from one of the executor

2024-06-25 14:36:03,380 WARN executor.Executor: task 181.0 in stage 573.0 (TID 35314) encountered a org.apache.spark.shuffle.FetchFailedException and failed, but the org.apache.spark.shuffle.FetchFailedException was hidden by another exception.  Spark is handling this like a fetch failure and ignoring the other exception: org.apache.comet.CometNativeExcept │
│ ion: General execution error with reason org.apache.comet.CometNativeException: called `Result::unwrap()` on an `Err` value: JNI { source: NullPtr("get_object_class") }                                                                                                                                                                                               │
│         at std::backtrace::Backtrace::create(__internal__:0)                                                                                                                                                                                                                                                                                                           │
│         at comet::errors::init::{{closure}}(__internal__:0)                                                                                                                                                                                                                                                                                                            │
│         at std::panicking::rust_panic_with_hook(__internal__:0)                                                                                                                                                                                                                                                                                                        │
│         at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)                                                                                                                                                                                                                                                                                            │
│         at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0)                                                                                                                                                                                                                                                                                      │
│         at rust_begin_unwind(__internal__:0)                                                                                                                                                                                                                                                                                                                           │
│         at core::panicking::panic_fmt(__internal__:0)                                                                                                                                                                                                                                                                                                                  │
│         at core::result::unwrap_failed(__internal__:0)                                                                                                                                                                                                                                                                                                                 │
│         at comet::execution::operators::scan::ScanExec::get_next(__internal__:0)                                                                                                                                                                                                                                                                                       │
│         at comet::execution::operators::scan::ScanExec::get_next_batch(__internal__:0)                                                                                                                                                                                                                                                                                 │
│         at comet::execution::jni_api::Java_org_apache_comet_Native_executePlan::{{closure}}(__internal__:0)                                                                                                                                                                                                                                                            │
│         at Java_org_apache_comet_Native_executePlan(__internal__:0)                                                                                                                                                                                                                                                                                                    │
│         at <unknown>(__internal__:0).                                                                                                                                                                                                                                                                                                                                  
lmouhib commented 1 week ago

Spark version is 3.4.3.

andygrove commented 1 week ago

@vaibhawvipul I am curious if you see the same issue if you disable Comet shuffle?

andygrove commented 1 week ago

@lmouhib would you be able to test again with the changes in https://github.com/apache/datafusion-comet/pull/598 to see if it resolves the panic and shows us more information about the root cause?

andygrove commented 1 week ago

@lmouhib also https://github.com/apache/datafusion-comet/pull/600 may help

lmouhib commented 1 week ago

I can try #600 once its merged with main. The exception raised which led to JNI error is marked as WARNING, not sure it is the root cause, seems like it happens when an executor wants to get some intermediate result during a shuffle and the remote executor was already killed due to OOM.

lmouhib commented 1 week ago

@vaibhawvipul I am curious if you see the same issue if you disable Comet shuffle?

Shouldn't it be when enabling Comet shuffle? Because I am able to run the test I disable the Comet shuffle. I am using kubernetes, maybe if we have access to a YARN based cluster we can try to run it with and without comet shuffle?