didi / KnowStreaming

一站式云原生实时流数据平台,通过0侵入、插件化构建企业级Kafka服务,极大降低操作、存储和管理实时流数据门槛
https://knowstreaming.com
GNU Affero General Public License v3.0
6.93k stars 1.28k forks source link

查看consumer group详情时timeout错误 #1096

Open LoneKingCode opened 1 year ago

LoneKingCode commented 1 year ago

环境信息

重现该问题的步骤

  1. 查看consumer group详情 knowstreaming里添加了两个kafka集群,一个是2.2.0, 一个是2.4.1 2.2.0集群查看Consumer Group详情时,可以查看到topic下的partition详情 如图 image 2.4.1集群查看Consumer Group详情时,总是报超时错误 image

预期结果

应该显示出topic先partition列表

实际结果

显示超时错误,然而这个集群的topic列表,统计信息等等基本都正常显示,只有consumer group详情出错, 网络问题排除掉,使用kafkactl都可以正常获取到信息,而且很快 然后KnowStreaming里的request timeout ,socket timeout 都设置到了30s,但是页面内那个超时错误每次都是瞬间弹出来

以下是异常信息,很多类似的 timeout


如果有异常,请附上异常Trace:

com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980884, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196)
        at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.collectMetrics(GroupMetricCollector.java:96)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.lambda$collectKafkaMetrics$0(GroupMetricCollector.java:66)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980884, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158)
        ... 12 common frames omitted

2023-07-19 14:58:40.883 [MetricCollect-Shard-1-9-thread-79] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPh
yId=1|groupName=wangyou_transport||errMsg=exception!
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, 
nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196)
        at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.collectMetrics(GroupMetricCollector.java:96)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.lambda$collectKafkaMetrics$0(GroupMetricCollector.java:66)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, nextAllowedTryMs=-922337203685
4775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
ZQKC commented 1 year ago
  • [x] 我已经在 issues 搜索过相关问题了,并没有重复的。

你是否希望来认领这个Bug。

「 Y / N 」

环境信息

  • KnowStreaming version : 3.3.0
  • Operating System version : centos8.4
  • Java version : 11.0.2

重现该问题的步骤

  1. 查看consumer group详情 knowstreaming里添加了两个kafka集群,一个是2.2.0, 一个是2.4.1 2.2.0集群查看Consumer Group详情时,可以查看到topic下的partition详情 如图 image 2.4.1集群查看Consumer Group详情时,总是报超时错误 image

预期结果

应该显示出topic先partition列表

实际结果

显示超时错误,然而这个集群的topic列表,统计信息等等基本都正常显示,只有consumer group详情出错, 网络问题排除掉,使用kafkactl都可以正常获取到信息,而且很快 然后KnowStreaming里的request timeout ,socket timeout 都设置到了30s,但是页面内那个超时错误每次都是瞬间弹出来

以下是异常信息,很多类似的 timeout

如果有异常,请附上异常Trace:

com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980884, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196)
        at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.collectMetrics(GroupMetricCollector.java:96)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.lambda$collectKafkaMetrics$0(GroupMetricCollector.java:66)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980884, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158)
        ... 12 common frames omitted

2023-07-19 14:58:40.883 [MetricCollect-Shard-1-9-thread-79] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPh
yId=1|groupName=wangyou_transport||errMsg=exception!
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, 
nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196)
        at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
        at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
        at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.collectMetrics(GroupMetricCollector.java:96)
        at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.lambda$collectKafkaMetrics$0(GroupMetricCollector.java:66)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, nextAllowedTryMs=-922337203685
4775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

1、在ks所在的机器上,用kafka的脚本查看一下该group的consumed offset信息看一下,看日志是寻找coordinator超时了。 2、request timeout ,socket timeout 设置可能会被忽略,因为前端10秒超时,后端请求一般5-10秒会超时掉。

LoneKingCode commented 1 year ago
  1. 所在机器用kafka-consumer-groups.sh查看信息2S左右就显示出内容 KnowStreaming所在机器用kafkactl连接集群也是2S左右显示出信息 都在一个云 地域内

有一点疑问是,从操作感知上,这个报错是瞬间的,前端点击过箭头按钮是立即弹出错误信息,没那个超时等待的感觉

CheungCHN commented 1 year ago

我也遇到了你一样的错误。也是立即弹出错误信息 日志内有大量的timeout报错,但是通过kafka命令工具都是可以正常获取配置等信息,集群也没有开启任何认证。

ZQKC commented 1 year ago

具体的接口是?然后对应的错误日志是?

LoneKingCode commented 1 year ago

具体的接口是?然后对应的错误日志是?

接口/ks-km/api/v3/clusters/1/topics/xxxxxxxxxx/groups/xxxxxxxxxxxxxxx/metric

image

2023-07-19 14:58:40.883 [MetricCollect-Shard-1-9-thread-79] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPh yId=1|groupName=wangyou_transport||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83) at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.collectMetrics(GroupMetricCollector.java:96) at com.xiaojukeji.know.streaming.km.collector.metric.kafka.GroupMetricCollector.lambda$collectKafkaMetrics$0(GroupMetricCollector.java:66) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1689749980882, tries=1, nextAllowedTryMs=-922337203685 4775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

ZQKC commented 1 year ago

MetricCollect-Shard-1-9-thread-79

1、这个日志不对,这个是采集线程的日志,前端页面是http请求,线程堆栈不是这样的。 2、这个地方出现了:The AdminClient thread has exited 的日志,看看有没有什么日志显示关闭了AdminClient。

LoneKingCode commented 1 year ago

MetricCollect-Shard-1-9-thread-79

1、这个日志不对,这个是采集线程的日志,前端页面是http请求,线程堆栈不是这样的。 2、这个地方出现了:The AdminClient thread has exited 的日志,看看有没有什么日志显示关闭了AdminClient。

1.API调用后的错误信息如下

API的方法里我加了测试日志,当调用到GroupManagerImpl.pagingGroupTopicConsumedMetrics方法中的groupService.getGroupOffsetFromKafka时,是立即抛出异常报错的,没感觉到有什么什么延时、超时

2023-07-26 18:07:26.829 [ApiCallTP-4-thread-2] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPhyId=1|groupName=integration_payment_logs||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.lambda$null$7(GroupManagerImpl.java:362) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:26.829 [ApiCallTP-4-thread-2] ERROR class=c.x.k.s.k.c.s.group.impl.GroupMetricServiceImpl||method=getLagFromAdminClient||clusterPhyId=1||groupName=integration_payment_logs||metrics=Lag||msg=exception com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.lambda$null$7(GroupManagerImpl.java:362) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) ... 11 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:28.250 [http-nio-8080-exec-8] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPhyId=1|groupName=integration_payment_logs||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.pagingGroupTopicConsumedMetrics(GroupManagerImpl.java:190) at com.xiaojukeji.know.streaming.km.rest.api.v3.group.GroupController.getTopicGroupMetric(GroupController.java:57) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:665) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:28.250 [http-nio-8080-exec-8] ERROR class=c.x.k.s.k.r.handler.CustomGlobalExceptionHandler||method=handleException||errMsg=exception com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.pagingGroupTopicConsumedMetrics(GroupManagerImpl.java:190) at com.xiaojukeji.know.streaming.km.rest.api.v3.group.GroupController.getTopicGroupMetric(GroupController.java:57) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:665) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) ... 56 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2.kafka admin client 开启了debug日志 也都是超时之类的错误信息

2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-66] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-118] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-126] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.180 [MetricCollect-Shard-0-8-thread-108] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905180, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-105] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905180, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-100] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-27] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-66] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-105] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited.

ZQKC commented 1 year ago

MetricCollect-Shard-1-9-thread-79

1、这个日志不对,这个是采集线程的日志,前端页面是http请求,线程堆栈不是这样的。 2、这个地方出现了:The AdminClient thread has exited 的日志,看看有没有什么日志显示关闭了AdminClient。

1.API调用后的错误信息如下

API的方法里我加了测试日志,当调用到GroupManagerImpl.pagingGroupTopicConsumedMetrics方法中的groupService.getGroupOffsetFromKafka时,是立即抛出异常报错的,没感觉到有什么什么延时、超时

2023-07-26 18:07:26.829 [ApiCallTP-4-thread-2] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPhyId=1|groupName=integration_payment_logs||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.lambda$null$7(GroupManagerImpl.java:362) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:26.829 [ApiCallTP-4-thread-2] ERROR class=c.x.k.s.k.c.s.group.impl.GroupMetricServiceImpl||method=getLagFromAdminClient||clusterPhyId=1||groupName=integration_payment_logs||metrics=Lag||msg=exception com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.getLagRelevantFromAdminClient(GroupMetricServiceImpl.java:196) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.collectGroupMetricsFromKafka(GroupMetricServiceImpl.java:83) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.lambda$null$7(GroupManagerImpl.java:362) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) ... 11 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366106829, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:28.250 [http-nio-8080-exec-8] ERROR class=c.x.k.s.k.core.service.group.impl.GroupServiceImpl||method=getGroupOffset||clusterPhyId=1|groupName=integration_payment_logs||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.pagingGroupTopicConsumedMetrics(GroupManagerImpl.java:190) at com.xiaojukeji.know.streaming.km.rest.api.v3.group.GroupController.getTopicGroupMetric(GroupController.java:57) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:665) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-07-26 18:07:28.250 [http-nio-8080-exec-8] ERROR class=c.x.k.s.k.r.handler.CustomGlobalExceptionHandler||method=handleException||errMsg=exception com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:167) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.pagingGroupTopicConsumedMetrics(GroupManagerImpl.java:190) at com.xiaojukeji.know.streaming.km.rest.api.v3.group.GroupController.getTopicGroupMetric(GroupController.java:57) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:665) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:158) ... 56 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690366108249, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2.kafka admin client 开启了debug日志 也都是超时之类的错误信息

2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-66] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-118] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-126] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.180 [MetricCollect-Shard-0-8-thread-108] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905180, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-105] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905180, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.181 [MetricCollect-Shard-0-8-thread-100] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905181, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-27] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-66] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited. java.lang.Exception: TimeoutException: The AdminClient thread has exited. 2023-07-26 18:20:45.182 [MetricCollect-Shard-0-8-thread-105] DEBUG o.a.k.clients.admin.KafkaAdminClient - [AdminClient clientId=adminclient-2] The AdminClient thread has exited. Timing out Call(callName=findCoordinator, deadlineMs=1690366905182, tries=0, nextAllowedTryMs=0). java.lang.Exception: TimeoutException: The AdminClient thread has exited.

info的日志里面有 “close kafka AdminClient starting, clusterPhyId:” 这个日志么?

ZQKC commented 1 year ago

说明一下: 1、调用之后马上超时的原因:日志里面有 The AdminClient thread has exited 这个日志,这个表示AdminClient客户端被关闭了,会导致调用直接超时。

2、基于上面的判断,可以看一下有没有什么地方主动关闭了AdminClient。

排查: 1、info的日志里面有 “close kafka AdminClient starting, clusterPhyId:” 这个日志么? 2、jstack打看看线程都在干嘛,辛苦提供一下。 3、不修改接入KS集群的配置,重启一下KS,看是否还有The AdminClient thread has exited的日志。同时打印一下jstack,运行一段时间之后,如果出现“close kafka AdminClient starting, clusterPhyId:” 这个日志,则再打印一下jstack对比看看。

LoneKingCode commented 1 year ago

1.info日志没看到,所以我单独又加了日志配置

我给KafkaAdminClient的remove方法单独加了日志,createKafkaAdminClient也单独加了日志 create的是都有,但是remove方法没被调用过,报The AdminClient thread has exited错误信息后也没见remove被调用

2. jstack信息

jstack1.txt

又看详细日志定位报错时机,是在

GroupServiceImpl中getGroupOffsetFromKafka方法的这里 partitionsToOffsetAndMetadata().get()一被调用就会瞬间报错 Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();

接口调用后的日志,异常信息如下

2023-07-27 18:25:12.216 [http-nio-8080-exec-1] INFO Test - start pagingGroupTopicConsumedMetrics 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - start groupService.getGroupOffsetFromKafka com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl@7c1447b5 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - getClient start 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - getClient end 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - adminClient.listConsumerGroupOffsets start 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - adminClient.listConsumerGroupOffsets end org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult@6b00e67d 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - partitionsToOffsetAndMetadata get , start 2023-07-27 18:25:12.217 [http-nio-8080-exec-1] INFO Test - method=getGroupOffset||clusterPhyId=1|groupName=mus_transport||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690453572217, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupServiceImpl.getGroupOffsetFromKafka(GroupServiceImpl.java:165) at com.xiaojukeji.know.streaming.km.biz.group.impl.GroupManagerImpl.pagingGroupTopicConsumedMetrics(GroupManagerImpl.java:190) at com.xiaojukeji.know.streaming.km.rest.api.v3.group.GroupController.getTopicGroupMetric(GroupController.java:57) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) at javax.servlet.http.HttpServlet.service(HttpServlet.java:665) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:750) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:888) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1597) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1690453572217, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

ZQKC commented 1 year ago

辛苦给一份kafka客户端,在创建KafkaAdminClient前和出现The AdminClient thread has exited日志后的debug日志吧。

LoneKingCode commented 1 year ago

辛苦给一份kafka客户端,在创建KafkaAdminClient前和出现The AdminClient thread has exited日志后的debug日志吧。

test.log 2023-07-28 10:13:29.787 是api请求开始 2023-07-28 10:13:29.789 是The AdminClient thread has exited.

kafka_client.log test.2023-07-28.log

ZQKC commented 1 year ago

辛苦给一份kafka客户端,在创建KafkaAdminClient前和出现The AdminClient thread has exited日志后的debug日志吧。

test.log 2023-07-28 10:13:29.787 是api请求开始 2023-07-28 10:13:29.789 是The AdminClient thread has exited.

kafka_client.log test.2023-07-28.log

辛苦使用最新的master分支的代码再跑一下看看,跑完之后,辛苦给一下kafka-client的trace日志,ks自身的error、warn、info、debug等全部日志。之前提供的里面,只有debug类型的。

之前的日志: 从上一次提供的日志里面看,kafka-admin客户端的内部线程确实主动退出了,但是从ks的日志里面,看不到close的地方。所以,增加了一些clientId,区分了一下ks自定的kafka-admin客户端,增加了关闭的超时时间。

LoneKingCode commented 1 year ago

辛苦给一份kafka客户端,在创建KafkaAdminClient前和出现The AdminClient thread has exited日志后的debug日志吧。

test.log 2023-07-28 10:13:29.787 是api请求开始 2023-07-28 10:13:29.789 是The AdminClient thread has exited. kafka_client.log test.2023-07-28.log

辛苦使用最新的master分支的代码再跑一下看看,跑完之后,辛苦给一下kafka-client的trace日志,ks自身的error、warn、info、debug等全部日志。之前提供的里面,只有debug类型的。

之前的日志: 从上一次提供的日志里面看,kafka-admin客户端的内部线程确实主动退出了,但是从ks的日志里面,看不到close的地方。所以,增加了一些clientId,区分了一下ks自定的kafka-admin客户端,增加了关闭的超时时间。

log_error.log里看到了 2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0': java.lang.RuntimeException: non-nullable field name was serialized as null

kafka_client_trace.log log_info.log log_error.log log_debug.log log_warn.log

集群里topic 300+,partition900+ 然后发现KafkaAdminClient的getClient方法里clientCnt值为1 所以把配置文件client-pool.kafka-admin.client-cnt调大,20的时候还是大量错误,到50的时候才变成少量timeout,consume group信息什么的都正常查看 不知道这个参数这样调有什么影响吗 线上集群版本是2.4.1, 不过本地也部署了kafka测试,本地测试正常,也可能是topic什么太少了没发现

ZQKC commented 1 year ago

Uncaught exception in thread

1、应该是这个原因导致的线程退出了,最终导致客户端被认为是关闭了,后续我看一下怎么修复。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null

2、client-pool.kafka-admin.client-cnt 是控制KS对每个Kafka集群,创建KafkaAdminClient的个数。设置大了之后,可能是客户端多了,错误不易被发现。可以去KS的Broker的详情页面,查看一下LogDirs那个信息,查看好之后看一下是否出现 java.lang.RuntimeException: non-nullable field name was serialized as null这个错误,如果出现了,那么客户端多仅仅是缓解该问题,不能彻底避免该问题。

3、我也去部署一下2.4.1版本的kafka集群,创建900+分区,看能否复现该问题。

4、老哥有兴趣的话,可以在 at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62) 这个地方打个断点看看,服务端到底返回了什么样的数据格式导致客户端协议解析出错,最终导致那个异常。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.read(DescribeLogDirsResponseData.java:616)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.<init>(DescribeLogDirsResponseData.java:584)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.read(DescribeLogDirsResponseData.java:365)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.<init>(DescribeLogDirsResponseData.java:303)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData.read(DescribeLogDirsResponseData.java:123)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData.<init>(DescribeLogDirsResponseData.java:76)
    at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62)
    at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:185)
    at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:730)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:875)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:570)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
    at java.base/java.lang.Thread.run(Thread.java:829)
LoneKingCode commented 1 year ago

Uncaught exception in thread

1、应该是这个原因导致的线程退出了,最终导致客户端被认为是关闭了,后续我看一下怎么修复。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null

2、client-pool.kafka-admin.client-cnt 是控制KS对每个Kafka集群,创建KafkaAdminClient的个数。设置大了之后,可能是客户端多了,错误不易被发现。可以去KS的Broker的详情页面,查看一下LogDirs那个信息,查看好之后看一下是否出现 java.lang.RuntimeException: non-nullable field name was serialized as null这个错误,如果出现了,那么客户端多仅仅是缓解该问题,不能彻底避免该问题。

3、我也去部署一下2.4.1版本的kafka集群,创建900+分区,看能否复现该问题。

4、老哥有兴趣的话,可以在 at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62) 这个地方打个断点看看,服务端到底返回了什么样的数据格式导致客户端协议解析出错,最终导致那个异常。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null
  at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.read(DescribeLogDirsResponseData.java:616)
  at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.<init>(DescribeLogDirsResponseData.java:584)
  at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.read(DescribeLogDirsResponseData.java:365)
  at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.<init>(DescribeLogDirsResponseData.java:303)
  at org.apache.kafka.common.message.DescribeLogDirsResponseData.read(DescribeLogDirsResponseData.java:123)
  at org.apache.kafka.common.message.DescribeLogDirsResponseData.<init>(DescribeLogDirsResponseData.java:76)
  at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62)
  at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:185)
  at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
  at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:730)
  at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:875)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:570)
  at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
  at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
  at java.base/java.lang.Thread.run(Thread.java:829)
  1. 仅仅是缓解了
  2. 这个version为1,是不是不太对 image
ZQKC commented 1 year ago

Uncaught exception in thread

1、应该是这个原因导致的线程退出了,最终导致客户端被认为是关闭了,后续我看一下怎么修复。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null

2、client-pool.kafka-admin.client-cnt 是控制KS对每个Kafka集群,创建KafkaAdminClient的个数。设置大了之后,可能是客户端多了,错误不易被发现。可以去KS的Broker的详情页面,查看一下LogDirs那个信息,查看好之后看一下是否出现 java.lang.RuntimeException: non-nullable field name was serialized as null这个错误,如果出现了,那么客户端多仅仅是缓解该问题,不能彻底避免该问题。 3、我也去部署一下2.4.1版本的kafka集群,创建900+分区,看能否复现该问题。 4、老哥有兴趣的话,可以在 at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62) 这个地方打个断点看看,服务端到底返回了什么样的数据格式导致客户端协议解析出错,最终导致那个异常。

2023-08-03 11:14:20.488 [kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0] ERROR class=org.apache.kafka.common.utils.KafkaThread||Uncaught exception in thread 'kafka-admin-client-thread | ApacheAdminClient||clusterPhyId=1||Cnt=0':
java.lang.RuntimeException: non-nullable field name was serialized as null
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.read(DescribeLogDirsResponseData.java:616)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsTopic.<init>(DescribeLogDirsResponseData.java:584)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.read(DescribeLogDirsResponseData.java:365)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData$DescribeLogDirsResult.<init>(DescribeLogDirsResponseData.java:303)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData.read(DescribeLogDirsResponseData.java:123)
    at org.apache.kafka.common.message.DescribeLogDirsResponseData.<init>(DescribeLogDirsResponseData.java:76)
    at org.apache.kafka.common.requests.DescribeLogDirsResponse.parse(DescribeLogDirsResponse.java:62)
    at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:185)
    at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:730)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:875)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:570)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
    at java.base/java.lang.Thread.run(Thread.java:829)
  1. 仅仅是缓解了
  2. 这个version为1,是不是不太对 image

1、version=1是对的。 2、这个集群有没有什么特殊的Topic名称存在啊,比如他的长度会非常的长,我看了一下代码,好像就只有这个原因可能会导致出现这个问题。可以去ZK上看看,看看有没有可能会特别长的Topic名称存在。