flashcatcloud / categraf

one-stop telemetry collector for nightingale
https://flashcat.cloud/docs/
MIT License
807 stars 246 forks source link

采集日志推送kafka报错 #873

Closed xiongtingshuchang closed 4 months ago

xiongtingshuchang commented 5 months ago

问题描述:

使用categraf采集日志推送到另外一台服务器上的Kafka,conf/logs.toml中sent_to配置失效,依然推送到本机IP。

categraf 版本:

v0.3.59-d1b43d6de940f39b0aad30bf2414c6576c7f574d

categraf错误日志:

11:08 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server

conf/logs.toml配置:

[logs]
## just a placholder
api_key = "ef4ahfbwzwwtlwfpbertgq1i6mq0ab1q"
## enable log collect or not
enable = true
## the server receive logs, http/tcp/kafka, only kafka brokers can be multiple ip:ports with concatenation character ","
send_to = "10.4.157.147:5044"
## send logs with protocol: http/tcp/kafka
send_type = "kafka"
topic = "police_server"
## send logs with compression or not 
use_compress = false
## use ssl or not
send_with_tls = false
## send logs in batchs
batch_wait = 5
## save offset in this path 
run_path = "/opt/categraf/run"
## max files can be open 
open_files_limit = 100
## scan config file in 10 seconds
scan_period = 10
## read buffer of udp 
frame_size = 9000
## channal size, default 100
## 读取日志缓冲区,行数
chan_size = 1000
## pipeline num , default 4
## 有多少线程处理日志
pipeline=4
## configuration for kafka
## 指定kafka版本
kafka_version="3.7.0"
# 默认0 表示串行,如果对日志顺序有要求,保持默认配置
batch_max_concurrence = 0
# 最大并发批次, 默认100
batch_max_size=100
# 每次最大发送的内容上限 默认1000000
batch_max_contentsize=1000000
# client timeout in seconds
producer_timeout= 10

# 是否开启sasl模式
sasl_enable = false
sasl_user = "admin"
sasl_password = "admin"
# PLAIN
sasl_mechanism= "PLAIN"
# v1
sasl_version=1
# set true
sasl_handshake = true
# optional
# sasl_auth_identity=""
#
##
# v0.3.39以上版本新增,是否开启pod日志采集
enable_collect_container=false

# 是否采集所有pod的stdout stderr
collect_container_all = false
  ## glog processing rules
  # [[logs.Processing_rules]]
  ## single log configure
  [[logs.items]]
  ## file/journald/tcp/udp
  type = "file"
  ## type=file, path is required; type=journald/tcp/udp, port is required
  path = "/data/test.txt"
  source = "tomcat"
  service = "my_service"
kongfei605 commented 5 months ago

./categrfa --test --debug --inputs logs-agent 看看输出

xiongtingshuchang commented 5 months ago

./categrfa --test --debug --inputs logs-agent 看看输出

[root@sgat-10-9-90-16 categraf]# ./categraf --test --debug --inputs logs-agent
2024/04/11 09:37:13 main.go:149: I! runner.binarydir: /home/prome/categraf
2024/04/11 09:37:13 main.go:150: I! runner.hostname: sgat-10-9-90-16
2024/04/11 09:37:13 main.go:151: I! runner.fd_limits: (soft=4096, hard=4096)
2024/04/11 09:37:13 main.go:152: I! runner.vm_limits: (soft=unlimited, hard=unlimited)
2024/04/11 09:37:13 provider_manager.go:60: I! use input provider: [local]
2024/04/11 09:37:13 logs_agent.go:87: I! Starting logs-agent...
2024/04/11 09:37:13 prometheus_agent.go:19: I! prometheus scraping disabled!
2024/04/11 09:37:13 ibex_agent.go:19: I! ibex agent disabled!
2024/04/11 09:37:13 agent.go:38: I! agent starting
2024/04/11 09:37:13 agent.go:46: I! [*agent.MetricsAgent] started
2024/04/11 09:37:13 destination.go:122: D! producer type: sync 256 5
2024/04/11 09:37:13 destination.go:154: D! saram config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s ResolveCanonicalBootstrapServers:false TLS:{Enable:false Config:<nil>} SASL:{Enable:false Mechanism: Version:1 Handshake:true AuthIdentity: User: Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:<nil> TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: CCachePath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0xb1f1c0 Idempotent:false Transaction:{ID: Timeout:1m0s Retry:{Max:50 Backoff:100ms BackoffFunc:<nil>}} Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:<nil> GroupStrategies:[0xc000f93020] Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]} InstanceId: ResetInvalidOffsets:true} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:500ms MaxProcessingTime:100ms Return:{Errors:false} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-1 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:2.1.0 MetricRegistry:0xc0010049e0}
2024/04/11 09:37:13 destination.go:122: D! producer type: sync 256 5
2024/04/11 09:37:13 destination.go:154: D! saram config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s ResolveCanonicalBootstrapServers:false TLS:{Enable:false Config:<nil>} SASL:{Enable:false Mechanism: Version:1 Handshake:true AuthIdentity: User: Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:<nil> TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: CCachePath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0xb1f1c0 Idempotent:false Transaction:{ID: Timeout:1m0s Retry:{Max:50 Backoff:100ms BackoffFunc:<nil>}} Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:<nil> GroupStrategies:[0xc000f93020] Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]} InstanceId: ResetInvalidOffsets:true} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:500ms MaxProcessingTime:100ms Return:{Errors:false} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-1 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:2.1.0 MetricRegistry:0xc0010049e0}
2024/04/11 09:37:13 destination.go:122: D! producer type: sync 256 5
2024/04/11 09:37:13 destination.go:154: D! saram config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s ResolveCanonicalBootstrapServers:false TLS:{Enable:false Config:<nil>} SASL:{Enable:false Mechanism: Version:1 Handshake:true AuthIdentity: User: Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:<nil> TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: CCachePath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0xb1f1c0 Idempotent:false Transaction:{ID: Timeout:1m0s Retry:{Max:50 Backoff:100ms BackoffFunc:<nil>}} Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:<nil> GroupStrategies:[0xc000f93020] Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]} InstanceId: ResetInvalidOffsets:true} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:500ms MaxProcessingTime:100ms Return:{Errors:false} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-1 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:2.1.0 MetricRegistry:0xc0010049e0}
2024/04/11 09:37:13 destination.go:122: D! producer type: sync 256 5
2024/04/11 09:37:13 destination.go:154: D! saram config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s ResolveCanonicalBootstrapServers:false TLS:{Enable:false Config:<nil>} SASL:{Enable:false Mechanism: Version:1 Handshake:true AuthIdentity: User: Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:<nil> TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: CCachePath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0xb1f1c0 Idempotent:false Transaction:{ID: Timeout:1m0s Retry:{Max:50 Backoff:100ms BackoffFunc:<nil>}} Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:<nil> GroupStrategies:[0xc000f93020] Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]} InstanceId: ResetInvalidOffsets:true} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:500ms MaxProcessingTime:100ms Return:{Errors:false} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-1 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:2.1.0 MetricRegistry:0xc0010049e0}
2024/04/11 09:37:13 scanner.go:267: Starting a new tailer for: /data/test.txt (offset: 0, whence: 2) for tailer key /data/test.txt
2024/04/11 09:37:13 tailer_nix.go:32: I! Opening /data/test.txt for tailer key /data/test.txt
2024/04/11 09:37:13 agent.go:46: I! [*agent.LogsAgent] started
2024/04/11 09:37:13 agent.go:49: I! agent started
2024/04/11 09:37:42 processor.go:99: D! log item: {"message":"{\"level\":\"error\",\"ts\":\"2023-11-01T16:57:15+08:00\",\"caller\":\"http/http.go:236\",\"msg\":\"failed to send http request\",\"error\":\"Get \\\"http://a.cn\\\": dial tcp: lookup a.cn on 183.60.83.19:53: no such host\",\"plugin\":\"http\",\"target\":\"http://a.cn\"}","status":"info","timestamp":1712799462017,"agent_hostname":"10.9.90.16","fcservice":"my_service","fcsource":"tomcat","fctags":"{\"filename\":\"test.txt\"}","topic":"police_server","msg_key":"10.9.90.16/"}
2024/04/11 09:37:42 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server
2024/04/11 09:37:44 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server
2024/04/11 09:37:47 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server
2024/04/11 09:37:52 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server
2024/04/11 09:38:02 destination.go:235: W! send message to kafka error dial tcp [::1]:5044: connect: connection refused, topic:police_server
kongfei605 commented 5 months ago

ipv6 关闭试试

xiongtingshuchang commented 5 months ago

关闭之后的日志

2024/04/11 14:26:18 agent.go:46: I! [*agent.LogsAgent] started
2024/04/11 14:26:18 agent.go:49: I! agent started
2024/04/11 14:26:18 scanner.go:267: Starting a new tailer for: /data/test.txt (offset: 0, whence: 2) for tailer key /data/test.txt
2024/04/11 14:26:18 tailer_nix.go:32: I! Opening /data/test.txt for tailer key /data/test.txt
2024/04/11 14:26:22 processor.go:99: D! log item: {"message":"{\"level\":\"error\",\"ts\":\"2023-11-01T16:57:15+08:00\",\"caller\":\"http/http.go:236\",\"msg\":\"failed to send http request\",\"error\":\"Get \\\"http://a.cn\\\": dial tcp: lookup a.cn on 183.60.83.19:53: no such host\",\"plugin\":\"http\",\"target\":\"http://a.cn\"}","status":"info","timestamp":1712816782777,"agent_hostname":"10.9.90.16","fcservice":"my_service","fcsource":"tomcat","fctags":"{\"filename\":\"test.txt\"}","topic":"police_server","msg_key":"10.9.90.16/"}
2024/04/11 14:26:23 destination.go:235: W! send message to kafka error dial tcp 127.0.0.1:5044: connect: connection refused, topic:police_server
2024/04/11 14:26:25 destination.go:235: W! send message to kafka error dial tcp 127.0.0.1:5044: connect: connection refused, topic:police_server
2024/04/11 14:26:29 destination.go:235: W! send message to kafka error dial tcp 127.0.0.1:5044: connect: connection refused, topic:police_server
xiongtingshuchang commented 5 months ago

ipv6 关闭试试

已经在服务器关闭ipv6了,日志可以看下评论

xiongtingshuchang commented 5 months ago

ipv6 关闭试试

如果我sent_to的地址填错了的话,cateraf启动会直接报错

UlricQin commented 5 months ago

ipv6 关闭试试

如果我sent_to的地址填错了的话,cateraf启动会直接报错

填错具体是填成了什么,报错具体是报了什么错,也贴出来有利于排查

xiongtingshuchang commented 5 months ago

ipv6 关闭试试

填错具体是填成了什么,报错具体是报了什么错,也贴出来有利于排查

如果我sent_to的地址填错了的话,cateraf启动会直接报错。如果我填入正确的IP,服务会正常启动,但发送至kafka时,如上面日志所示,会发送到本机

send_to = "127.0.0.1:5044"

错误日志为

2024/04/12 14:00:30 main.go:149: I! runner.binarydir: /home/prome/categraf
2024/04/12 14:00:30 main.go:150: I! runner.hostname: sgat-10-9-90-16
2024/04/12 14:00:30 main.go:151: I! runner.fd_limits: (soft=4096, hard=4096)
2024/04/12 14:00:30 main.go:152: I! runner.vm_limits: (soft=unlimited, hard=unlimited)
2024/04/12 14:00:30 provider_manager.go:60: I! use input provider: [local]
2024/04/12 14:00:30 logs_agent.go:87: I! Starting logs-agent...
2024/04/12 14:00:30 prometheus_agent.go:19: I! prometheus scraping disabled!
2024/04/12 14:00:30 ibex_agent.go:19: I! ibex agent disabled!
2024/04/12 14:00:30 agent.go:38: I! agent starting
2024/04/12 14:00:30 agent.go:46: I! [*agent.MetricsAgent] started
2024/04/12 14:00:30 destination.go:122: D! producer type: sync 256 5
2024/04/12 14:00:30 destination.go:154: D! saram config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s ResolveCanonicalBootstrapServers:false TLS:{Enable:false Config:<nil>} SASL:{Enable:false Mechanism: Version:1 Handshake:true AuthIdentity: User: Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:<nil> TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: CCachePath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0xb1f1c0 Idempotent:false Transaction:{ID: Timeout:1m0s Retry:{Max:50 Backoff:100ms BackoffFunc:<nil>}} Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:<nil> GroupStrategies:[0xc000cca960] Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]} InstanceId: ResetInvalidOffsets:true} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:500ms MaxProcessingTime:100ms Return:{Errors:false} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-1 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:2.1.0 MetricRegistry:0xc000c69be0}
panic: kafka: client has run out of available brokers to talk to: dial tcp 127.0.0.1:5044: connect: connection refused

goroutine 1 [running]:
flashcat.cloud/categraf/logs/client/kafka.newDestination({{0xc000c917a0, 0x20}, {0xc000179b00, 0xe}, {0xc00097e480, 0xd}, {0xc000179b00, 0x9}, 0x13b4, 0x0, ...}, ...)
        /home/runner/work/categraf/categraf/logs/client/kafka/destination.go:160 +0xc57
flashcat.cloud/categraf/logs/client/kafka.NewDestination({{0xc000c917a0, 0x20}, {0xc000179b00, 0xe}, {0xc00097e480, 0xd}, {0xc000179b00, 0x9}, 0x13b4, 0x0, ...}, ...)
        /home/runner/work/categraf/categraf/logs/client/kafka/destination.go:60 +0x8a
flashcat.cloud/categraf/logs/pipeline.NewPipeline(0xc000cd7860, {0x0, 0x0, 0x0}, 0xc000ec1e60, 0xc000c68a40, {0x557aca0?, 0xc000eafc20}, 0x4e?)
        /home/runner/work/categraf/categraf/logs/pipeline/pipeline.go:50 +0x4ea
flashcat.cloud/categraf/logs/pipeline.(*provider).Start(0xc000c24700)
        /home/runner/work/categraf/categraf/logs/pipeline/provider.go:77 +0x9a
flashcat.cloud/categraf/logs/restart.(*starter).Start(0xc000f41c48?)
        /home/runner/work/categraf/categraf/logs/restart/starter.go:30 +0x35
flashcat.cloud/categraf/agent.(*LogsAgent).startInner(0xc000c24800)
        /home/runner/work/categraf/categraf/agent/logs_agent.go:180 +0x1d7
flashcat.cloud/categraf/agent.(*LogsAgent).Start(0xc000c24800)
        /home/runner/work/categraf/categraf/agent/logs_agent.go:141 +0x27
flashcat.cloud/categraf/agent.(*Agent).Start(0xc000cca810)
        /home/runner/work/categraf/categraf/agent/agent.go:43 +0xef
main.runAgent(0x4c4ff87?)
        /home/runner/work/categraf/categraf/main_posix.go:17 +0x3a
main.main()
        /home/runner/work/categraf/categraf/main.go:113 +0x314
kongfei605 commented 5 months ago

你机器上有什么culium 之类的网络插件?或者 /etc/hosts中有人乱写? 这直接给拐到localhost 匪夷所思。

UlricQin commented 5 months ago

神奇。这个机器的网络环境有什么特殊之处?

1.使用send_to = "127.0.0.1:5044"这个配置,直接panic报错,说明无法建立kafka连接 2.使用send_to = "10.4.157.147:5044"这个配置,没有panic报错,说明建立kafka连接了,但是虽然建立连接了,后面发消息又失败了,而且报错来看,像是往一个错误的kafka连接上发送的

我对这块不熟,仅从上面的信息推断,我甚至怀疑是kafka sarama库的问题

xiongtingshuchang commented 5 months ago

你机器上有什么culium 之类的网络插件?或者 /etc/hosts中有人乱写? 这直接给拐到localhost 匪夷所思。

如果是这样的话?categraf上报心跳的时候不应该也是异常吗?这个是正常的。

UlricQin commented 5 months ago

你机器上有什么culium 之类的网络插件?或者 /etc/hosts中有人乱写? 这直接给拐到localhost 匪夷所思。

如果是这样的话?categraf上报心跳的时候不应该也是异常吗?这个是正常的。

如果是相对复杂的一些网络环境,确实会出现一些奇奇怪怪的问题,建议你抓包分析试试,看看能否找到线索

xiongtingshuchang commented 4 months ago

你机器上有什么culium 之类的网络插件?或者 /etc/hosts中有人乱写? 这直接给拐到localhost 匪夷所思。 问题已解决,修改新版Kafka配置后正常

broker.id=0
listeners=PLAINTEXT://10.4.157.147:5044
advertised.listeners=PLAINTEXT://10.4.157.147:5044
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=204857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
xiongtingshuchang commented 4 months ago

由于kafka配置导致