Open lifepuzzlefun opened 1 year ago
@lifepuzzlefun Have you considered integrating existing tooling such as Reactor BlockHound to detect blocking code that is run on threads which should be non-blocking?
These BlockHound docs explains how to write an integration for BlockHound: https://github.com/reactor/BlockHound/blob/master/docs/custom_integrations.md and https://github.com/reactor/BlockHound/blob/master/docs/customization.md
Essentially, it's about configuring which threads should be non-blocking and about white-listing some known violations.
For example, Netty already comes with a BlockHound integration: https://github.com/netty/netty/blob/f027fa2df77af7719aa9686659633ee0fc73ebf9/common/src/main/java/io/netty/util/internal/Hidden.java#L31-L186
I replied on the mailing list thread: https://lists.apache.org/thread/p76y2p4cst06nmo8yzcw8pyg19x6rtnt
The issue had no activity for 30 days, mark with Stale label.
Motivation
As we know. Pulsar is a system written in asynchronous style. In previous version, there were some cases which will cause thread blocking for a long time. This PIP want to introduce some lightweight apm mechanism to check if thread is blocking for a long time.
Goal
API Changes
no change of API.
Implementation
Basic idea
For single thread executor. there is a task queue for the thread. so if a task insert into the queue and executed. we think at least the thread is not blocking. Otherwise, the task will stay in the queue for very long time.
So we can register the executor pool to the monitor class and submit
ping
task periodicly. if task is executed. we update thethread
lastActiveTimestamp
and check if there were some threads whose active timestamp is not refreshed in time. Then we think perhaps the thread is blocking.For multi thread executor. Like
Executors.newScheduledThreadPool(100)
the task queue is shared among all the threads in the pool. so theping
task is not guaranteed to execute on different thread every time. and thelastActiveTimestamp
is not refreshed in time.To monitor those threads in the multi thread executor. each thread will have a
runningTask
state marker. iflastActiveTimstamp
is not refreshed in time andrunningTask
isfalse
we think the thread is just health to run task.JDK
ThreadPoolExecutor.beforeExecute(Thread t, Runnable r)
is called before each task submit in the task queue. andThreadPoolExecutor.afterExecute(Runnable r, Throwable t)
is called after task is finished. So we can override those methods to updaterunningTask
state of the threads in pool.Current Pulsar Thread Category
Single thread executor with one queue bind on one thread. Netty
EventLoop
&OrderedExecutor
&OrderedScheduler
&Executors.newSingleThreadScheduledExecutor
Multi thread executor with one queue bind mult-threads.
WebExecutorThreadPool
used bypulsar-web
andpulsar-websocket
and those created byExecutors.newXXXX
ForkJoinPool.commonPool
used byCompletableFuture
callbacks.Changes
ThreadMonitor
andThreadPoolMonitor
will be responsible for the main logic.1.
ThreadMonitor
will have a
Map<threadId, threadMonitorState{lastActiveTimestamp,runningTask}>
to record the monitor state.2.
ThreadPoolMonitor
ping
tasks to all registered executor pool.lastActiveTimestamp
not refresh in time andrunningTask
== false).Those thread stack will be dumped using ThreadMXBean provide by JDK.ThreadMonitor
.3.
ExecutorProvider
andScheduledExecutorProvider
now all executor and schedule executor will be create by these class. eg.
Executors.newXXXXX
->ExecutorProvider.newXXXX
Executors.newScheduleXXXX
->ScheduleredExecutorProvider.newXXXXX
those executors created by Provider will be registered in the
ThreadPoolMonitor
automatically.Also
EventLoopGroup
created byEventLoopUtil
will be registered in theThreadPoolMonitor
automatically.Metrics
pulsar_thread_last_active_timestamp_ms{threadName="${threadName},tid="${tid},runningTask="${runningTask}"}
will be output in the promethues metric endpoint.thread_blocked_count
,thread_blocked_time_ms
,thread_waited_count
,thread_waited_time_ms
will be output in the prometheus metric endpoint. ifTheadMXBean.isThreadContentionMonitoringSupported
return true.Configuration
For current implementation all the config is set by system property. And there were some tools can change system property in the runtime.
pulsar.thread-pool.monitor.enabled
if scheduleping
task to registerd executors.pulsar.thread-pool.monitor.check.ms
interval to scheduleping
task to registered executors.pulsar.thread-pool.monitor.blocking.dump.enabled
if dump blocking threads.pulsar.thread-pool.monitor.blocking.dump.check.ms
interval to check if there were threadlastActiveTimestamp
not refreshed in time.pulsar-thread-pool.monitor.blocking.dump.threshold.ms
>= this value will think thread is blocking.pulsar-thread-pool.monitor.blocking.dump.stack-max-depth
thread max dump stack depth.Left for discussion
ForkJoinPool
is not easy to use the method as mentioned above.ForkJoinPool.commonPool
is not easy to replace or override callback method.CompletableFuture
to trace each callback pending in the queue. maybe this will introduce a lot overhead which will harm the performance.Alternatives
No response
Anything else?
No response