apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.86k stars 996 forks source link

[Bug] The task status cannot be obtained, and the log shows a RejectedExecutionException exception. #2773

Open Trgree opened 1 year ago

Trgree commented 1 year ago

Search before asking

Java Version

JDK 1.8

Scala Version

2.12.x

StreamPark Version

2.1.0

Flink Version

flink 1.14.6

deploy mode

yarn-application

What happened

After the task is submitted, the status is always starting, but the actual task status is running. When I restart streampark, it becomes OK

Number of running apps: 120 Number of server cores: 32 2 versions of streampark are deployed on the server: streampark_2.1.0_scala2.11 and streampark_2.1.0_scala2.12,Only streampark_2.1.0_scala2.12 has this exception.

Error Exception

2023-05-29 10:20:10 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@2967b295 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
    at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:11 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@160b86e9 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
    at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:12 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@17e64df3 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
    at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
    at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Screenshots

image image

Are you willing to submit PR?

Code of Conduct

zhoulii commented 1 year ago

In this situation, the thread pool is full, so the new tasks will be rejected. Increasing the num of threads or the size of blocking queue may mitigate this problem, but cannot fix it thoroughly, it's highly possible that the backlog will grows over time.

Whenever doWatch method is invoked, FlinkRestApiWatcher will add a monitor job for every tracked app, since WATCHING_APPS contains all the apps need to be tracked, the queued tasks in the thread pool can be discarded. So I have two solutions for this issue:

1.clear all the queued tasks in thread pool when doWatch method is invoked; 2.set the reject policy of thread pool to DiscardOldestPolicy

what's your opinion ? @wolfboys

zhoulii commented 1 year ago

Since most of the tasks in thread pool are duplicated, maybe we can define a special blocking queue which can deduplicate tasks for the the same app naturally when building the thread pool .