LinShunKang / MyPerf4J

High performance Java APM. Powered by ASM. Try it. Test it. If you feel its better, use it.
BSD 3-Clause "New" or "Revised" License
3.41k stars 540 forks source link

请问是否可以支持监控 Flink 任务? #63

Closed ImSingee closed 3 years ago

ImSingee commented 3 years ago

问题描述

希望可以监控 Flink Task,监控全部 Task 或一个指定的均可

运行环境

ImSingee commented 3 years ago

折腾无法起来的原因找到了,问题在于

对于同一个类,例如叫做 com.example.demo.A

include 如果设置为 com.example 就能找到这个类的统计信息,设置为 com.example.demo 就找不到

这是什么情况,有什么日志可以定位这个问题吗

ImSingee commented 3 years ago

另外,可以解释一下 recorder.size.timing_arr 和 recorder.size.timing_map 的作用吗

LinShunKang commented 3 years ago

折腾无法起来的原因找到了,问题在于

对于同一个类,例如叫做 com.example.demo.A

include 如果设置为 com.example 就能找到这个类的统计信息,设置为 com.example.demo 就找不到

这是什么情况,有什么日志可以定位这个问题吗

exclude 的优先级更高,你是不是配置了 exclude ? MyPerf4J 会在 标准输出 里打印被织入字节码的类,你可以 grep ProfilingTransformer.transform | grep ${yourClassName} 看看你的类是否被织入字节码了

LinShunKang commented 3 years ago

另外,可以解释一下 recorder.size.timing_arr 和 recorder.size.timing_map 的作用吗

可以看 这里,其实你可以不用配置,MyPerf4J 会根据上一次运行的数据自动的调整这两个值。

ImSingee commented 3 years ago

我找到了完全不能记录的原因:Flink 会启动多个进程,因此固定 port 会报错 AdressAlreadyInUse,我将配置文件中的 port 改成 0 解决了(不过不知道有没有副作用)

但现在又出现了新的问题…… Stdout 输出

2020-12-04 10:17:32.975 [MyPerf4J] ERROR [Source: Custom Source -> Flat Map -> Sink: Unnamed (2/2)] ProfilingTransformer.transform(org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader, com/tophant/pvs/process/Process, null, protectionDomain, 2403) Type com/tophant/pvs/models/PVSDetectResult not present
java.lang.TypeNotPresentException: Type com/tophant/pvs/models/PVSDetectResult not present
    at org.shaded.objectweb.asm.ClassWriter.getCommonSuperClass(ClassWriter.java:1026)
    at org.shaded.objectweb.asm.SymbolTable.addMergedType(SymbolTable.java:1202)
    at org.shaded.objectweb.asm.Frame.merge(Frame.java:1299)
    at org.shaded.objectweb.asm.Frame.merge(Frame.java:1207)
    at org.shaded.objectweb.asm.MethodWriter.computeAllFrames(MethodWriter.java:1610)
    at org.shaded.objectweb.asm.MethodWriter.visitMaxs(MethodWriter.java:1546)
    at org.shaded.objectweb.asm.MethodVisitor.visitMaxs(MethodVisitor.java:772)
    at org.shaded.objectweb.asm.commons.LocalVariablesSorter.visitMaxs(LocalVariablesSorter.java:148)
    at org.shaded.objectweb.asm.ClassReader.readCode(ClassReader.java:2636)
    at org.shaded.objectweb.asm.ClassReader.readMethod(ClassReader.java:1488)
    at org.shaded.objectweb.asm.ClassReader.accept(ClassReader.java:718)
    at org.shaded.objectweb.asm.ClassReader.accept(ClassReader.java:401)
    at cn.myperf4j.asm.aop.ProfilingTransformer.getBytes(ProfilingTransformer.java:59)
    at cn.myperf4j.asm.aop.ProfilingTransformer.transform(ProfilingTransformer.java:38)
    at sun.instrument.TransformerManager.transform(TransformerManager.java:188)
    at sun.instrument.InstrumentationImpl.transform(InstrumentationImpl.java:428)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.tophant.pvs.UseCase.<init>(UseCase.java:21)
    at com.tophant.pvs.UseCase.<init>(UseCase.java:10)
    at com.tophant.pvs.UseCase$Builder.getUseCase(UseCase.java:62)
    at com.tophant.nta.flinktask.pvs.process.PvsFlatProcess.open(PvsFlatProcess.java:41)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.tophant.pvs.models.PVSDetectResult
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.shaded.objectweb.asm.ClassWriter.getCommonSuperClass(ClassWriter.java:1024)
    ... 43 more

但这个类是存在的…

LinShunKang commented 3 years ago

@ImSingee 你好,这个原因是因为 MyPerf4J 没有适配 Flink,Flink 有自己的 ClassLoader,你可以试一下这个版本 MyPerf4J-ASM.jar

ImSingee commented 3 years ago

@LinShunKang 感谢!我周一试试,目前我改完端口后除了那个类已经可以完美工作了

LinShunKang commented 3 years ago

@ImSingee 客气,如果还有问题可以继续交流;端口那个问题,我打算用新增一个 http.server.port.range 来解决,周末我抽时间开发一下。

LinShunKang commented 3 years ago

@ImSingee 我增强了 http.server.port 的配置功能,使用这个 MyPerf4J-ASM.jar 即可:

# 配置 HTTP Server 端口号
# 格式为:首选端口,备选最小端口,备选最大端口
http.server.port = 2048,2000,2040
ImSingee commented 3 years ago

maxPort 被设置成了与 minPort 相同的值,我提了个 PR #64

ImSingee commented 3 years ago

另外给个建议,建议在文档中说明下需要提前创建好 influxdb 的 database,或者在启动阶段检测没有则创建

ImSingee commented 3 years ago

@ImSingee 你好,这个原因是因为 MyPerf4J 没有适配 Flink,Flink 有自己的 ClassLoader,你可以试一下这个版本 MyPerf4J-ASM.jar

这个问题解决了,感谢!

LinShunKang commented 3 years ago

另外给个建议,建议在文档中说明下需要提前创建好 influxdb 的 database,或者在启动阶段检测没有则创建

好建议

LinShunKang commented 3 years ago

另外给个建议,建议在文档中说明下需要提前创建好 influxdb 的 database,或者在启动阶段检测没有则创建

好建议

已支持,下载 最新包 即可