apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
738 stars 144 forks source link

RejectedExecutionException caused by ThreadPoolExecutor when running spark-submit #725

Open radhikabajaj123 opened 1 month ago

radhikabajaj123 commented 1 month ago

Hello, I am getting the following exception when running spark-submit. Any idea what might be causing the error:

java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@1e6328c6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 23831]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:435)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:435)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:335)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
    at scala.concurrent.Promise.tryFailure(Promise.scala:117)
    at scala.concurrent.Promise.tryFailure$(Promise.scala:117)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:104)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6$adapted(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.RpcOutboxMessage.onFailure(Outbox.scala:86)
    at org.apache.spark.network.client.TransportResponseHandler.failOutstandingRequests(TransportResponseHandler.java:118)
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasksFrom(SingleThreadEventExecutor.java:426)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:375)
    at io.netty.util.concurrent.SingleThreadEventExecutor.confirmShutdown(SingleThreadEventExecutor.java:763)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:596)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@1e6328c6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 23831]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:435)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:335)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
    at scala.concurrent.Promise.tryFailure(Promise.scala:117)
    at scala.concurrent.Promise.tryFailure$(Promise.scala:117)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:104)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6$adapted(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.RpcOutboxMessage.onFailure(Outbox.scala:86)
    at org.apache.spark.network.client.TransportResponseHandler.failOutstandingRequests(TransportResponseHandler.java:118)
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasksFrom(SingleThreadEventExecutor.java:426)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:375)
    at io.netty.util.concurrent.SingleThreadEventExecutor.confirmShutdown(SingleThreadEventExecutor.java:763)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:596)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@1e6328c6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 23831]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:435)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
    at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:504)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:335)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
    at scala.concurrent.Promise.tryFailure(Promise.scala:117)
    at scala.concurrent.Promise.tryFailure$(Promise.scala:117)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:104)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:214)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$6$adapted(NettyRpcEnv.scala:245)
    at org.apache.spark.rpc.netty.RpcOutboxMessage.onFailure(Outbox.scala:86)
    at org.apache.spark.network.client.TransportResponseHandler.failOutstandingRequests(TransportResponseHandler.java:118)
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasksFrom(SingleThreadEventExecutor.java:426)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:375)
    at io.netty.util.concurrent.SingleThreadEventExecutor.confirmShutdown(SingleThreadEventExecutor.java:763)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:596)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

End of LogType:stderr
***********************************************************************
viirya commented 1 month ago

Hmm, I cannot think that how Comet causes that. Do you see this error only when Comet is enabled? How about to disable Comet and try it again?

radhikabajaj123 commented 1 month ago

I only saw the error when Comet was enabled

viirya commented 1 month ago

There are too less infos there. Could you provide more infos? Eg., your spark-submit command line (all configs you added), your environment, etc..

viirya commented 1 month ago
java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@1e6328c6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 23831]

Looks like the task is submitted to the pool after it is terminated. There should be another root cause that already terminated the runtime. You probably need to look for it in the logs.