didi / KnowStreaming

一站式云原生实时流数据平台,通过0侵入、插件化构建企业级Kafka服务,极大降低操作、存储和管理实时流数据门槛
https://knowstreaming.com
GNU Affero General Public License v3.0
6.99k stars 1.28k forks source link

集群采集数据时断时续,采集client数据一直报超时 #1133

Closed leon535 closed 1 year ago

leon535 commented 1 year ago

环境信息

集群采集数据时断时续

图片

客户端池大小相关配置

client-pool:
    kafka-consumer:
        min-idle-client-num: 2        # 最小空闲客户端数
        max-idle-client-num: 20       # 最大空闲客户端数
        max-total-client-num: 100      # 最大客户端数
        borrow-timeout-unit-ms: 5000  # 租借超时时间,单位秒
  kafka-admin:
       client-cnt: 10 
```                # 每个Kafka集群创建的KafkaAdminClient数

如果有异常,请附上异常Trace:

2023-08-24 13:54:02.819 [MetricCollect-Shard-1-9-thread-67] ERROR class=c.x.k.s.k.c.s.partition.impl.PartitionServiceImpl||method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId=3||topicName=daga_search_fs_goods_job_quality_grad e_increase_topic||offsetSpec=com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec$KSEarliestSpec@32159dbe||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=metadata, deadlineMs=1692856452819, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s ) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.batchGetPartitionOffsetFromKafkaAdminClient(PartitionServiceImpl.java:398) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.getPartitionBeginAndEndOffsetFromKafka(PartitionServiceImpl.java:239) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionMetricServiceImpl.getOffsetRelevantMetrics(PartitionMetricServiceImpl.java:180) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionMetricServiceImpl.collectPartitionsMetricsFromKafka(PartitionMetricServiceImpl.java:146) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionMetricServiceImpl.collectPartitionsMetricsFromKafkaWithCache(PartitionMetricServiceImpl.java:85) at com.xiaojukeji.know.streaming.km.core.service.topic.impl.TopicMetricServiceImpl.getMessages(TopicMetricServiceImpl.java:326) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.topic.impl.TopicMetricServiceImpl.collectTopicMetricsFromKafka(TopicMetricServiceImpl.java:131) at com.xiaojukeji.know.streaming.km.core.service.topic.impl.TopicMetricServiceImpl.collectTopicMetricsFromKafkaWithCacheFirst(TopicMetricServiceImpl.java:113) at com.xiaojukeji.know.streaming.km.collector.metric.kafka.TopicMetricCollector.collectMetrics(TopicMetricCollector.java:98) at com.xiaojukeji.know.streaming.km.collector.metric.kafka.TopicMetricCollector.lambda$collectKafkaMetrics$0(TopicMetricCollector.java:66) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=metadata, deadlineMs=1692856452819, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2023-08-24 13:54:02.820 [MetricCollect-Shard-1-9-thread-67] ERROR class=c.x.k.s.k.c.s.partition.impl.PartitionServiceImpl||method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId=3||topicName=daga_search_fs_goods_job_quality_grad e_increase_topic||offsetSpec=com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec$KSLatestSpec@34964913||errMsg=exception! java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=metadata, deadlineMs=1692856452819, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s ) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.batchGetPartitionOffsetFromKafkaAdminClient(PartitionServiceImpl.java:398) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93) at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62) at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.getPartitionBeginAndEndOffsetFromKafka(PartitionServiceImpl.java:239) at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionMetricServiceImpl.getOffsetRelevantMetrics(PartitionMetricServiceImpl.java:180) at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)

ZQKC commented 1 year ago

@leon535

存在The AdminClient thread has exited这个日志表示kafka-admin客户端的线程已经退出了,这个线程退出了,会导致请求offset信息直接报timeout,辛苦看一下线程退出的原因。

ZQKC commented 1 year ago

无更多反馈,关闭该Issue