IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.58k stars 1.76k forks source link

Incorrect rate5 metrics values #2910

Open gunli opened 6 months ago

gunli commented 6 months ago
Description
Versions
Sarama Kafka Go
1.43.0 2.2.0 1.21.4
Configuration
          brokers: [ "xxx:9092" ]
          topic: yyy
          keepAlive: 5
          timeout: 30000
          batchMessages: 50
          batchFrequency: 10
          channelBufferSize: 409600
Logs
logs: CLICK ME

``` ```

Additional Context

I use saram as a Kafka producer, and my service run for a long time, from our own metrics I can see sarama is working well, the belowed metrics (and tcpdump) shows that we are producing a lot of messages:

[root@VM-40-142-tencentos ~]# curl 127.0.0.1:10002/metrics | grep tglog_event_count | grep 'type="kafka"'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1528k    0 1528k    0     0  61.4M      0 --:--:-- --:--:-- --:--:-- 62.1M
tglog_event_count{desc="succeed",event_name="ItemFlow",name="sink-kafka-master",type="kafka"} 518629
tglog_event_count{desc="succeed",event_name="MoneyFLow",name="sink-kafka-master",type="kafka"} 428848
tglog_event_count{desc="succeed",event_name="PlayerExpFlow",name="sink-kafka-master",type="kafka"} 351052
tglog_event_count{desc="succeed",event_name="PlayerLogin",name="sink-kafka-master",type="kafka"} 220093
tglog_event_count{desc="succeed",event_name="PlayerLogout",name="sink-kafka-master",type="kafka"} 216224
tglog_event_count{desc="succeed",event_name="PlayerPerformanceByMatch",name="sink-kafka-master",type="kafka"} 353212
tglog_event_count{desc="succeed",event_name="PlayerPerformanceByRound",name="sink-kafka-master",type="kafka"} 5.305572e+06
tglog_event_count{desc="succeed",event_name="PlayerRegister",name="sink-kafka-master",type="kafka"} 2625
tglog_event_count{desc="succeed",event_name="SettingsChange",name="sink-kafka-master",type="kafka"} 445454
tglog_event_count{desc="succeed",event_name="apAccountAuditEvent",name="sink-kafka-master",type="kafka"} 5717
tglog_event_count{desc="succeed",event_name="apDamage",name="sink-kafka-master",type="kafka"} 2.3625055e+07
tglog_event_count{desc="succeed",event_name="apFriendsForeverChanges",name="sink-kafka-master",type="kafka"} 25225
tglog_event_count{desc="succeed",event_name="apMatchDetails",name="sink-kafka-master",type="kafka"} 80637
tglog_event_count{desc="succeed",event_name="apMatchDetails_players",name="sink-kafka-master",type="kafka"} 411611
tglog_event_count{desc="succeed",event_name="apMatchDetails_players_newExpr",name="sink-kafka-master",type="kafka"} 2746
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats",name="sink-kafka-master",type="kafka"} 5.362136e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats_damage",name="sink-kafka-master",type="kafka"} 7.627876e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats_kills",name="sink-kafka-master",type="kafka"} 5.337579e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_teams",name="sink-kafka-master",type="kafka"} 187822
tglog_event_count{desc="succeed",event_name="apMatchStartEvent_PlayerInfos",name="sink-kafka-master",type="kafka"} 395353
tglog_event_count{desc="succeed",event_name="apMissionProgress",name="sink-kafka-master",type="kafka"} 681794
tglog_event_count{desc="succeed",event_name="apMmrUpdatedEvent",name="sink-kafka-master",type="kafka"} 352195
tglog_event_count{desc="succeed",event_name="apPlayerContractProgress",name="sink-kafka-master",type="kafka"} 347152
tglog_event_count{desc="succeed",event_name="apPlayerLoadout",name="sink-kafka-master",type="kafka"} 406104
tglog_event_count{desc="succeed",event_name="apSessionCCUEvent",name="sink-kafka-master",type="kafka"} 3603
[root@VM-40-142-tencentos ~]# curl 127.0.0.1:10002/metrics | grep tglog_event_count | grep 'type="kafka"'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1528k    0 1528k    0     0  57.8M      0 --:--:-- --:--:-- --:--:-- 59.7M
tglog_event_count{desc="succeed",event_name="ItemFlow",name="sink-kafka-master",type="kafka"} 518965
tglog_event_count{desc="succeed",event_name="MoneyFLow",name="sink-kafka-master",type="kafka"} 429133
tglog_event_count{desc="succeed",event_name="PlayerExpFlow",name="sink-kafka-master",type="kafka"} 351292
tglog_event_count{desc="succeed",event_name="PlayerLogin",name="sink-kafka-master",type="kafka"} 220211
tglog_event_count{desc="succeed",event_name="PlayerLogout",name="sink-kafka-master",type="kafka"} 216319
tglog_event_count{desc="succeed",event_name="PlayerPerformanceByMatch",name="sink-kafka-master",type="kafka"} 353464
tglog_event_count{desc="succeed",event_name="PlayerPerformanceByRound",name="sink-kafka-master",type="kafka"} 5.308714e+06
tglog_event_count{desc="succeed",event_name="PlayerRegister",name="sink-kafka-master",type="kafka"} 2627
tglog_event_count{desc="succeed",event_name="SettingsChange",name="sink-kafka-master",type="kafka"} 445840
tglog_event_count{desc="succeed",event_name="apAccountAuditEvent",name="sink-kafka-master",type="kafka"} 5724
tglog_event_count{desc="succeed",event_name="apDamage",name="sink-kafka-master",type="kafka"} 2.3641865e+07
tglog_event_count{desc="succeed",event_name="apFriendsForeverChanges",name="sink-kafka-master",type="kafka"} 25246
tglog_event_count{desc="succeed",event_name="apMatchDetails",name="sink-kafka-master",type="kafka"} 80685
tglog_event_count{desc="succeed",event_name="apMatchDetails_players",name="sink-kafka-master",type="kafka"} 411897
tglog_event_count{desc="succeed",event_name="apMatchDetails_players_newExpr",name="sink-kafka-master",type="kafka"} 2747
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats",name="sink-kafka-master",type="kafka"} 5.365446e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats_damage",name="sink-kafka-master",type="kafka"} 7.63328e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_roundResults_playerStats_kills",name="sink-kafka-master",type="kafka"} 5.34104e+06
tglog_event_count{desc="succeed",event_name="apMatchDetails_teams",name="sink-kafka-master",type="kafka"} 187929
tglog_event_count{desc="succeed",event_name="apMatchStartEvent_PlayerInfos",name="sink-kafka-master",type="kafka"} 395562
tglog_event_count{desc="succeed",event_name="apMissionProgress",name="sink-kafka-master",type="kafka"} 682149
tglog_event_count{desc="succeed",event_name="apMmrUpdatedEvent",name="sink-kafka-master",type="kafka"} 352450
tglog_event_count{desc="succeed",event_name="apPlayerContractProgress",name="sink-kafka-master",type="kafka"} 347357
tglog_event_count{desc="succeed",event_name="apPlayerLoadout",name="sink-kafka-master",type="kafka"} 406405
tglog_event_count{desc="succeed",event_name="apSessionCCUEvent",name="sink-kafka-master",type="kafka"} 3605

But the metrics value of kafka_request_rate_rate5 get from sarama showing that the request rate is closed to 0: kafka_request_rate_rate5{name="sink-kafka-master"} 0.0016490552150537254

# HELP kafka_request_rate_for_broker_0_rate5 request-rate-for-broker-0_rate5
# TYPE kafka_request_rate_for_broker_0_rate5 gauge
kafka_request_rate_for_broker_0_rate5{name="sink-kafka-master"} 20.27717661669732
# HELP kafka_request_rate_for_broker_10_rate5 request-rate-for-broker-10_rate5
# TYPE kafka_request_rate_for_broker_10_rate5 gauge
kafka_request_rate_for_broker_10_rate5{name="sink-kafka-master"} 3.222489569981827e-05
# HELP kafka_request_rate_for_broker_11_rate5 request-rate-for-broker-11_rate5
# TYPE kafka_request_rate_for_broker_11_rate5 gauge
kafka_request_rate_for_broker_11_rate5{name="sink-kafka-master"} 5.731289464235474e-73
# HELP kafka_request_rate_for_broker_12_rate5 request-rate-for-broker-12_rate5
# TYPE kafka_request_rate_for_broker_12_rate5 gauge
kafka_request_rate_for_broker_12_rate5{name="sink-kafka-master"} 1.1217830483584259e-61
# HELP kafka_request_rate_for_broker_13_rate5 request-rate-for-broker-13_rate5
# TYPE kafka_request_rate_for_broker_13_rate5 gauge
# HELP kafka_request_rate_for_broker_14_rate5 request-rate-for-broker-14_rate5
# TYPE kafka_request_rate_for_broker_14_rate5 gauge
# HELP kafka_request_rate_for_broker_15_rate5 request-rate-for-broker-15_rate5
# TYPE kafka_request_rate_for_broker_15_rate5 gauge
kafka_request_rate_for_broker_15_rate5{name="sink-kafka-master"} 4.5136202222204e-116
# HELP kafka_request_rate_for_broker_16_rate5 request-rate-for-broker-16_rate5
# TYPE kafka_request_rate_for_broker_16_rate5 gauge
kafka_request_rate_for_broker_16_rate5{name="sink-kafka-master"} 6.642046050438541e-14
# HELP kafka_request_rate_for_broker_17_rate5 request-rate-for-broker-17_rate5
# TYPE kafka_request_rate_for_broker_17_rate5 gauge
kafka_request_rate_for_broker_17_rate5{name="sink-kafka-master"} 3.0971879632156735e-09
# HELP kafka_request_rate_for_broker_18_rate5 request-rate-for-broker-18_rate5
# TYPE kafka_request_rate_for_broker_18_rate5 gauge
kafka_request_rate_for_broker_18_rate5{name="sink-kafka-master"} 3.213962243911357e-62
# HELP kafka_request_rate_for_broker_19_rate5 request-rate-for-broker-19_rate5
# TYPE kafka_request_rate_for_broker_19_rate5 gauge
kafka_request_rate_for_broker_19_rate5{name="sink-kafka-master"} 3.8651247636716294e-56
# HELP kafka_request_rate_for_broker_1_rate5 request-rate-for-broker-1_rate5
# TYPE kafka_request_rate_for_broker_1_rate5 gauge
kafka_request_rate_for_broker_1_rate5{name="sink-kafka-master"} 20.973076663093607
# HELP kafka_request_rate_for_broker_20_rate5 request-rate-for-broker-20_rate5
# TYPE kafka_request_rate_for_broker_20_rate5 gauge
# HELP kafka_request_rate_for_broker_21_rate5 request-rate-for-broker-21_rate5
# TYPE kafka_request_rate_for_broker_21_rate5 gauge
kafka_request_rate_for_broker_21_rate5{name="sink-kafka-master"} 2.537839726946107e-48
# HELP kafka_request_rate_for_broker_22_rate5 request-rate-for-broker-22_rate5
# TYPE kafka_request_rate_for_broker_22_rate5 gauge
kafka_request_rate_for_broker_22_rate5{name="sink-kafka-master"} 5.902195528649941e-07
# HELP kafka_request_rate_for_broker_23_rate5 request-rate-for-broker-23_rate5
# TYPE kafka_request_rate_for_broker_23_rate5 gauge
kafka_request_rate_for_broker_23_rate5{name="sink-kafka-master"} 1.5823809040715057e-20
# HELP kafka_request_rate_for_broker_24_rate5 request-rate-for-broker-24_rate5
# TYPE kafka_request_rate_for_broker_24_rate5 gauge
# HELP kafka_request_rate_for_broker_25_rate5 request-rate-for-broker-25_rate5
# TYPE kafka_request_rate_for_broker_25_rate5 gauge
kafka_request_rate_for_broker_25_rate5{name="sink-kafka-master"} 3.6703686331353404e-36
# HELP kafka_request_rate_for_broker_26_rate5 request-rate-for-broker-26_rate5
# TYPE kafka_request_rate_for_broker_26_rate5 gauge
# HELP kafka_request_rate_for_broker_27_rate5 request-rate-for-broker-27_rate5
# TYPE kafka_request_rate_for_broker_27_rate5 gauge
kafka_request_rate_for_broker_27_rate5{name="sink-kafka-master"} 1.463008001375153e-09
# HELP kafka_request_rate_for_broker_28_rate5 request-rate-for-broker-28_rate5
# TYPE kafka_request_rate_for_broker_28_rate5 gauge
# HELP kafka_request_rate_for_broker_29_rate5 request-rate-for-broker-29_rate5
# TYPE kafka_request_rate_for_broker_29_rate5 gauge
kafka_request_rate_for_broker_29_rate5{name="sink-kafka-master"} 4.907845087837114e-13
# HELP kafka_request_rate_for_broker_2_rate5 request-rate-for-broker-2_rate5
# TYPE kafka_request_rate_for_broker_2_rate5 gauge
kafka_request_rate_for_broker_2_rate5{name="sink-kafka-master"} 22.097304206443788
# HELP kafka_request_rate_for_broker_30_rate5 request-rate-for-broker-30_rate5
# TYPE kafka_request_rate_for_broker_30_rate5 gauge
kafka_request_rate_for_broker_30_rate5{name="sink-kafka-master"} 4.173118168505621e-105
# HELP kafka_request_rate_for_broker_31_rate5 request-rate-for-broker-31_rate5
# TYPE kafka_request_rate_for_broker_31_rate5 gauge
kafka_request_rate_for_broker_31_rate5{name="sink-kafka-master"} 1.406121159912054e-13
# HELP kafka_request_rate_for_broker_32_rate5 request-rate-for-broker-32_rate5
# TYPE kafka_request_rate_for_broker_32_rate5 gauge
# HELP kafka_request_rate_for_broker_33_rate5 request-rate-for-broker-33_rate5
# TYPE kafka_request_rate_for_broker_33_rate5 gauge
# HELP kafka_request_rate_for_broker_34_rate5 request-rate-for-broker-34_rate5
# TYPE kafka_request_rate_for_broker_34_rate5 gauge
kafka_request_rate_for_broker_34_rate5{name="sink-kafka-master"} 2.928166811838042e-84
# HELP kafka_request_rate_for_broker_35_rate5 request-rate-for-broker-35_rate5
# TYPE kafka_request_rate_for_broker_35_rate5 gauge
kafka_request_rate_for_broker_35_rate5{name="sink-kafka-master"} 8.288917875149842e-61
# HELP kafka_request_rate_for_broker_36_rate5 request-rate-for-broker-36_rate5
# TYPE kafka_request_rate_for_broker_36_rate5 gauge
kafka_request_rate_for_broker_36_rate5{name="sink-kafka-master"} 2.1102866174376484e-54
# HELP kafka_request_rate_for_broker_37_rate5 request-rate-for-broker-37_rate5
# TYPE kafka_request_rate_for_broker_37_rate5 gauge
kafka_request_rate_for_broker_37_rate5{name="sink-kafka-master"} 1.2624005134411874e-68
# HELP kafka_request_rate_for_broker_38_rate5 request-rate-for-broker-38_rate5
# TYPE kafka_request_rate_for_broker_38_rate5 gauge
kafka_request_rate_for_broker_38_rate5{name="sink-kafka-master"} 7.677161405365711e-12
# HELP kafka_request_rate_for_broker_39_rate5 request-rate-for-broker-39_rate5
# TYPE kafka_request_rate_for_broker_39_rate5 gauge
kafka_request_rate_for_broker_39_rate5{name="sink-kafka-master"} 1.0810248195345395e-08
# HELP kafka_request_rate_for_broker_3_rate5 request-rate-for-broker-3_rate5
# TYPE kafka_request_rate_for_broker_3_rate5 gauge
kafka_request_rate_for_broker_3_rate5{name="sink-kafka-master"} 20.503711416791496
# HELP kafka_request_rate_for_broker_40_rate5 request-rate-for-broker-40_rate5
# TYPE kafka_request_rate_for_broker_40_rate5 gauge
kafka_request_rate_for_broker_40_rate5{name="sink-kafka-master"} 3.015484241691e-18
# HELP kafka_request_rate_for_broker_41_rate5 request-rate-for-broker-41_rate5
# TYPE kafka_request_rate_for_broker_41_rate5 gauge
kafka_request_rate_for_broker_41_rate5{name="sink-kafka-master"} 3.261530132504595e-29
# HELP kafka_request_rate_for_broker_42_rate5 request-rate-for-broker-42_rate5
# TYPE kafka_request_rate_for_broker_42_rate5 gauge
kafka_request_rate_for_broker_42_rate5{name="sink-kafka-master"} 2.0795090760261164e-87
# HELP kafka_request_rate_for_broker_43_rate5 request-rate-for-broker-43_rate5
# TYPE kafka_request_rate_for_broker_43_rate5 gauge
kafka_request_rate_for_broker_43_rate5{name="sink-kafka-master"} 7.871255400998887e-41
# HELP kafka_request_rate_for_broker_44_rate5 request-rate-for-broker-44_rate5
# TYPE kafka_request_rate_for_broker_44_rate5 gauge
kafka_request_rate_for_broker_44_rate5{name="sink-kafka-master"} 1.2165331694217554e-15
# HELP kafka_request_rate_for_broker_45_rate5 request-rate-for-broker-45_rate5
# TYPE kafka_request_rate_for_broker_45_rate5 gauge
kafka_request_rate_for_broker_45_rate5{name="sink-kafka-master"} 2.434854798369265e-90
# HELP kafka_request_rate_for_broker_46_rate5 request-rate-for-broker-46_rate5
# TYPE kafka_request_rate_for_broker_46_rate5 gauge
kafka_request_rate_for_broker_46_rate5{name="sink-kafka-master"} 4.191588102388212e-10
# HELP kafka_request_rate_for_broker_47_rate5 request-rate-for-broker-47_rate5
# TYPE kafka_request_rate_for_broker_47_rate5 gauge
# HELP kafka_request_rate_for_broker_48_rate5 request-rate-for-broker-48_rate5
# TYPE kafka_request_rate_for_broker_48_rate5 gauge
# HELP kafka_request_rate_for_broker_49_rate5 request-rate-for-broker-49_rate5
# TYPE kafka_request_rate_for_broker_49_rate5 gauge
kafka_request_rate_for_broker_49_rate5{name="sink-kafka-master"} 9.191904459627557e-101
# HELP kafka_request_rate_for_broker_4_rate5 request-rate-for-broker-4_rate5
# TYPE kafka_request_rate_for_broker_4_rate5 gauge
kafka_request_rate_for_broker_4_rate5{name="sink-kafka-master"} 22.480717730066292
# HELP kafka_request_rate_for_broker_50_rate5 request-rate-for-broker-50_rate5
# TYPE kafka_request_rate_for_broker_50_rate5 gauge
kafka_request_rate_for_broker_50_rate5{name="sink-kafka-master"} 3.9628428484415057e-85
# HELP kafka_request_rate_for_broker_51_rate5 request-rate-for-broker-51_rate5
# TYPE kafka_request_rate_for_broker_51_rate5 gauge
kafka_request_rate_for_broker_51_rate5{name="sink-kafka-master"} 9.097934238552172e-39
# HELP kafka_request_rate_for_broker_52_rate5 request-rate-for-broker-52_rate5
# TYPE kafka_request_rate_for_broker_52_rate5 gauge
kafka_request_rate_for_broker_52_rate5{name="sink-kafka-master"} 2.2885295609133332e-08
# HELP kafka_request_rate_for_broker_53_rate5 request-rate-for-broker-53_rate5
# TYPE kafka_request_rate_for_broker_53_rate5 gauge
kafka_request_rate_for_broker_53_rate5{name="sink-kafka-master"} 9.679642865153054e-121
# HELP kafka_request_rate_for_broker_54_rate5 request-rate-for-broker-54_rate5
# TYPE kafka_request_rate_for_broker_54_rate5 gauge
kafka_request_rate_for_broker_54_rate5{name="sink-kafka-master"} 4.967303785481508e-37
# HELP kafka_request_rate_for_broker_55_rate5 request-rate-for-broker-55_rate5
# TYPE kafka_request_rate_for_broker_55_rate5 gauge
kafka_request_rate_for_broker_55_rate5{name="sink-kafka-master"} 3.0971879632156735e-09
# HELP kafka_request_rate_for_broker_56_rate5 request-rate-for-broker-56_rate5
# TYPE kafka_request_rate_for_broker_56_rate5 gauge
kafka_request_rate_for_broker_56_rate5{name="sink-kafka-master"} 5.886577186150621e-64
# HELP kafka_request_rate_for_broker_58_rate5 request-rate-for-broker-58_rate5
# TYPE kafka_request_rate_for_broker_58_rate5 gauge
kafka_request_rate_for_broker_58_rate5{name="sink-kafka-master"} 8.084524915208697e-32
# HELP kafka_request_rate_for_broker_59_rate5 request-rate-for-broker-59_rate5
# TYPE kafka_request_rate_for_broker_59_rate5 gauge
kafka_request_rate_for_broker_59_rate5{name="sink-kafka-master"} 0.09606106021795988
# HELP kafka_request_rate_for_broker_5_rate5 request-rate-for-broker-5_rate5
# TYPE kafka_request_rate_for_broker_5_rate5 gauge
kafka_request_rate_for_broker_5_rate5{name="sink-kafka-master"} 19.669860865939345
# HELP kafka_request_rate_for_broker_60_rate5 request-rate-for-broker-60_rate5
# TYPE kafka_request_rate_for_broker_60_rate5 gauge
kafka_request_rate_for_broker_60_rate5{name="sink-kafka-master"} 2.0850264563224843e-30
# HELP kafka_request_rate_for_broker_61_rate5 request-rate-for-broker-61_rate5
# TYPE kafka_request_rate_for_broker_61_rate5 gauge
kafka_request_rate_for_broker_61_rate5{name="sink-kafka-master"} 1.3490610998592636e-55
# HELP kafka_request_rate_for_broker_62_rate5 request-rate-for-broker-62_rate5
# TYPE kafka_request_rate_for_broker_62_rate5 gauge
# HELP kafka_request_rate_for_broker_63_rate5 request-rate-for-broker-63_rate5
# TYPE kafka_request_rate_for_broker_63_rate5 gauge
kafka_request_rate_for_broker_63_rate5{name="sink-kafka-master"} 2.2281582227296156e-17
# HELP kafka_request_rate_for_broker_64_rate5 request-rate-for-broker-64_rate5
# TYPE kafka_request_rate_for_broker_64_rate5 gauge
kafka_request_rate_for_broker_64_rate5{name="sink-kafka-master"} 1.0941214692332214e-32
# HELP kafka_request_rate_for_broker_65_rate5 request-rate-for-broker-65_rate5
# TYPE kafka_request_rate_for_broker_65_rate5 gauge
kafka_request_rate_for_broker_65_rate5{name="sink-kafka-master"} 1.3690277277265148e-22
# HELP kafka_request_rate_for_broker_66_rate5 request-rate-for-broker-66_rate5
# TYPE kafka_request_rate_for_broker_66_rate5 gauge
kafka_request_rate_for_broker_66_rate5{name="sink-kafka-master"} 2.8217764602223444e-31
# HELP kafka_request_rate_for_broker_67_rate5 request-rate-for-broker-67_rate5
# TYPE kafka_request_rate_for_broker_67_rate5 gauge
# HELP kafka_request_rate_for_broker_68_rate5 request-rate-for-broker-68_rate5
# TYPE kafka_request_rate_for_broker_68_rate5 gauge
kafka_request_rate_for_broker_68_rate5{name="sink-kafka-master"} 1.9510891458322826e-43
# HELP kafka_request_rate_for_broker_69_rate5 request-rate-for-broker-69_rate5
# TYPE kafka_request_rate_for_broker_69_rate5 gauge
kafka_request_rate_for_broker_69_rate5{name="sink-kafka-master"} 2.0546173225236374e-63
# HELP kafka_request_rate_for_broker_6_rate5 request-rate-for-broker-6_rate5
# TYPE kafka_request_rate_for_broker_6_rate5 gauge
kafka_request_rate_for_broker_6_rate5{name="sink-kafka-master"} 20.77724580438877
# HELP kafka_request_rate_for_broker_71_rate5 request-rate-for-broker-71_rate5
# TYPE kafka_request_rate_for_broker_71_rate5 gauge
kafka_request_rate_for_broker_71_rate5{name="sink-kafka-master"} 0.0017594196902211028
# HELP kafka_request_rate_for_broker_73_rate5 request-rate-for-broker-73_rate5
# TYPE kafka_request_rate_for_broker_73_rate5 gauge
kafka_request_rate_for_broker_73_rate5{name="sink-kafka-master"} 7.987753035877094e-08
# HELP kafka_request_rate_for_broker_74_rate5 request-rate-for-broker-74_rate5
# TYPE kafka_request_rate_for_broker_74_rate5 gauge
# HELP kafka_request_rate_for_broker_76_rate5 request-rate-for-broker-76_rate5
# TYPE kafka_request_rate_for_broker_76_rate5 gauge
kafka_request_rate_for_broker_76_rate5{name="sink-kafka-master"} 1.4416707152569423e-42
# HELP kafka_request_rate_for_broker_77_rate5 request-rate-for-broker-77_rate5
# TYPE kafka_request_rate_for_broker_77_rate5 gauge
kafka_request_rate_for_broker_77_rate5{name="sink-kafka-master"} 1.8478747477967275e-80
# HELP kafka_request_rate_for_broker_78_rate5 request-rate-for-broker-78_rate5
# TYPE kafka_request_rate_for_broker_78_rate5 gauge
kafka_request_rate_for_broker_78_rate5{name="sink-kafka-master"} 5.672697630479246e-11
# HELP kafka_request_rate_for_broker_79_rate5 request-rate-for-broker-79_rate5
# TYPE kafka_request_rate_for_broker_79_rate5 gauge
kafka_request_rate_for_broker_79_rate5{name="sink-kafka-master"} 3.0971879632156735e-09
# HELP kafka_request_rate_for_broker_7_rate5 request-rate-for-broker-7_rate5
# TYPE kafka_request_rate_for_broker_7_rate5 gauge
kafka_request_rate_for_broker_7_rate5{name="sink-kafka-master"} 19.257377726390622
# HELP kafka_request_rate_for_broker_8_rate5 request-rate-for-broker-8_rate5
# TYPE kafka_request_rate_for_broker_8_rate5 gauge
kafka_request_rate_for_broker_8_rate5{name="sink-kafka-master"} 19.440791885931677
# HELP kafka_request_rate_for_broker_9_rate5 request-rate-for-broker-9_rate5
# TYPE kafka_request_rate_for_broker_9_rate5 gauge
kafka_request_rate_for_broker_9_rate5{name="sink-kafka-master"} 19.194802633449534
# HELP kafka_request_rate_rate5 request-rate_rate5
# TYPE kafka_request_rate_rate5 gauge
kafka_request_rate_rate5{name="sink-kafka-master"} 0.0016490552150537254

The value of kafka_outgoing_byte_rate_rate5 is incorrect either:

[root@VM-40-142-tencentos ~]# curl 127.0.0.1:10002/metrics | grep kafka_outgoing_byte_rate_rate5 |grep -v slave 
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1528k    0 1528k    0     0  62.3M      0 --:--:-- --:--:-- --:--:-- 64.9M
# HELP kafka_outgoing_byte_rate_rate5 outgoing-byte-rate_rate5
# TYPE kafka_outgoing_byte_rate_rate5 gauge
kafka_outgoing_byte_rate_rate5{name="sink-kafka-master"} 0.01181155573500513

image Aboved are 2 panels showing kafka_request_rate_rate5 and kafka_outgoing_byte_rate_rate5, we can see they failed quiet often.

Belowed are my producer code, I convert the go-metrics in sarama to prometheus metrics:

package sink

import (
    "bytes"
    "crypto/sha256"
    "crypto/sha512"
    "fmt"
    "strconv"
    "time"

    prometheusmetrics "git.woa.com/tglog/v3/go-metrics-prometheus"
    "github.com/IBM/sarama"
    "github.com/prometheus/client_golang/prometheus"
    gometrics "github.com/rcrowley/go-metrics"
    "github.com/xdg-go/scram"
    "go.uber.org/zap"

    "git.woa.com/tglog/v3/server/pkg/component"
    "git.woa.com/tglog/v3/server/pkg/event"
    "git.woa.com/tglog/v3/server/pkg/logger/zaplog"
    "git.woa.com/tglog/v3/server/pkg/metrics"
    "git.woa.com/tglog/v3/server/pkg/util"
)

// sha hash functions
var (
    SHA256 scram.HashGeneratorFcn = sha256.New
    SHA512 scram.HashGeneratorFcn = sha512.New
)

// XDGSCRAMClient is a scram client
type XDGSCRAMClient struct {
    *scram.Client
    *scram.ClientConversation
    scram.HashGeneratorFcn
}

// Begin begins auth
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
    x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
    if err != nil {
        return err
    }
    x.ClientConversation = x.Client.NewConversation()
    return nil
}

// Step handles steps
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
    response, err = x.ClientConversation.Step(challenge)
    return
}

// Done end the auth
func (x *XDGSCRAMClient) Done() bool {
    return x.ClientConversation.Done()
}

// SASLCfg is the SASL config for kafka
type SASLCfg struct {
    Enable           bool   `yaml:"enable"`
    Mechanism        string `yaml:"mechanism"`
    DisableHandshake bool   `yaml:"disableHandshake"`
    User             string `yaml:"user"`
    Password         string `yaml:"password"`
}

// KafkaSinkCfg defines the config of kafka
type KafkaSinkCfg struct {
    Brokers                       []string          `yaml:"brokers"`                       // broker列表,ip:port格式
    Topic                         string            `yaml:"topic"`                         // default topic
    KeepAlive                     int64             `yaml:"keepAlive"`                     // 保活间隔,秒
    Timeout                       int64             `yaml:"timeout"`                       // 写超时,毫秒
    BatchMessages                 int               `yaml:"batchMessages"`                 // 批次消息数量,条
    BatchFrequency                int               `yaml:"batchFrequency"`                // 批次发送频率,毫秒
    ChannelBufferSize             int               `yaml:"channelBufferSize"`             // 缓冲队列大小
    Version                       string            `yaml:"version"`                       // 版本号,如:2.1.0
    DropIfTopicNotInRouter        bool              `yaml:"dropIfTopicNotInRouter"`        // 如果topic在router中找不到是否丢弃日志
    AddHeaders                    map[string]string `yaml:"addHeaders"`                    // 要添加的自定义header
    AddHeadersToLog               bool              `yaml:"addHeadersToLog"`               // 自定义header是否加到日志里,是则会添加k1=v1&k2=v2到原始日志里
    AddHeadersToLogSuffix         string            `yaml:"addHeadersToLogSuffix"`         // 如果加到日志里,后缀是什么,一般tglog是"|"
    SASL                          SASLCfg           `yaml:"sasl"`                          // SASL鉴权
    ReportEventNameLabelToMetrics bool              `yaml:"reportEventNameLabelToMetrics"` // 事件名是否记录到指标标签中,默认否
}

type kafkaSink struct {
    baseSink
}

type kafkaWriter struct {
    baseWriteCloser
    producer              sarama.AsyncProducer
    cfg                   KafkaSinkCfg
    addHeaders            map[string]string // 缓存附加的headers
    addHeadersString      string            // 缓存附加的headers,要加到原始日志中的字符串
    addHeadersPlaceHolder bool              // 是否存在占位符,存在的话需要替换
}

func (k *kafkaWriter) write(evt event.Event) error {
    if k.producer == nil {
        panic(fmt.Sprintf("sink[%s/%s] has no producer", k.TypeOfMine, k.NameOfMine))
    }

    // if router is set, get topic name from router
    topic := k.cfg.Topic
    if k.router != nil {
        t, ok := k.router.Route(evt)
        if ok {
            topic = t
        } else if k.cfg.DropIfTopicNotInRouter {
            // router里找不到,丢弃,可以当filter用
            return nil
        }
    }

    headers, addString := parseKafkaHeaders(evt.Headers, k.addHeaders, k.addHeadersString, k.addHeadersPlaceHolder, k.cfg.AddHeadersToLog)
    value := sarama.ByteEncoder(evt.Content)
    if addString != "" {
        bb := bytes.Buffer{}
        bb.Grow(len(addString) + len(evt.Content))
        bb.WriteString(addString)
        bb.Write(evt.Content)
        value = bb.Bytes()
    }

    // 保存日志名,以便在失败的时候可以重新获取
    headers = append(headers, sarama.RecordHeader{Key: util.StringBytes(event.EventName), Value: util.StringBytes(evt.Name)})
    msg := &sarama.ProducerMessage{
        // Key:      sarama.StringEncoder(evt.Name), // 重要:不能随便填这个字段,否则有可能会导致数据倾斜
        Topic:    topic,
        Value:    value,
        Headers:  headers,
        Metadata: evt.Name, // 保存日志名
    }

    if evt.RouteKey != nil { // 指定了路由key,用指定的
        msg.Key = sarama.ByteEncoder(evt.RouteKey)
    }

    k.producer.Input() <- msg

    // 输入1、输出1
    // metrics.AddEvents(k.typeOfMine, k.nameOfMine, 1, 0, 1, 0, 0)
    return nil
}

func (k *kafkaWriter) close() {
    if k.producer != nil {
        _ = k.producer.Close()
    }
}

// newKafkaSink news a kafka sink
func newKafkaSink(name string, cfg KafkaSinkCfg) (Sink, error) {
    kfkCfg := sarama.NewConfig()
    kfkCfg.Version = sarama.V2_2_0_0
    if cfg.Version != "" {
        ver, err := sarama.ParseKafkaVersion(cfg.Version)
        if err != nil {
            return nil, err
        }

        kfkCfg.Version = ver
    }

    goRegistry := gometrics.NewRegistry()
    prometheusRegistry := prometheus.DefaultRegisterer
    metricsClient := prometheusmetrics.NewPrometheusProvider(goRegistry, "", "kafka",
        prometheusRegistry, 15*time.Second, map[string]string{"name": name})

    kfkCfg.ChannelBufferSize = cfg.ChannelBufferSize
    kfkCfg.MetricRegistry = goRegistry
    kfkCfg.Net.KeepAlive = time.Duration(cfg.KeepAlive) * time.Second
    kfkCfg.Net.MaxOpenRequests = 32
    kfkCfg.Producer.RequiredAcks = sarama.WaitForLocal                                     // Only wait for the leader to ack
    kfkCfg.Producer.Compression = sarama.CompressionSnappy                                 // compress messages
    kfkCfg.Producer.Flush.Messages = cfg.BatchMessages                                     //
    kfkCfg.Producer.Flush.Frequency = time.Duration(cfg.BatchFrequency) * time.Millisecond // Flush batches every 500ms
    kfkCfg.Producer.Timeout = time.Duration(cfg.Timeout) * time.Millisecond
    kfkCfg.Producer.Return.Successes = true
    kfkCfg.Producer.Return.Errors = true
    kfkCfg.Net.SASL.Enable = cfg.SASL.Enable
    kfkCfg.Net.SASL.User = cfg.SASL.User
    kfkCfg.Net.SASL.Password = cfg.SASL.Password
    if cfg.SASL.DisableHandshake {
        kfkCfg.Net.SASL.Handshake = false
    }
    mechanism := cfg.SASL.Mechanism
    if mechanism == "SCRAM-SHA-256" {
        kfkCfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
        kfkCfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
            return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
        }
    } else if mechanism == "SCRAM-SHA-512" {
        kfkCfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
        kfkCfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
            return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
        }
    }

    producer, err := sarama.NewAsyncProducer(cfg.Brokers, kfkCfg)
    if err != nil {
        return nil, err
    }

    //
    addHeaders := make(map[string]string)
    addHeadersString := ""
    addHeadersPlaceHolder := false
    if len(cfg.AddHeaders) > 0 {
        for k, v := range cfg.AddHeaders {
            if _, ok := runtimePlaceholders[v]; ok {
                addHeadersPlaceHolder = true
            }

            kv := ""
            if addHeadersString != "" {
                kv = fmt.Sprintf("&%s=%s", k, v)
            } else {
                kv = fmt.Sprintf("%s=%s", k, v)
            }

            kv = updateStaticValue(v, kv) // 替换kv字符串中的IP地址等静态占位符
            addHeadersString += kv

            addHeaders[k] = updateStaticValue(v, v) // 替换value中的IP地址等静态占位符
        }

        addHeadersString += cfg.AddHeadersToLogSuffix
        headersInLogValue := "false"
        if cfg.AddHeadersToLog {
            headersInLogValue = "true"
        }
        addHeaders[headersInLog] = headersInLogValue
    }
    //

    sink := kafkaSink{
        baseSink: baseSink{
            BaseComponent: component.BaseComponent{
                TypeOfMine:    TypeKafka,
                NameOfMine:    name,
                EventCounters: make(map[string]prometheus.Counter),
            },
            writeCloser: &kafkaWriter{
                baseWriteCloser: baseWriteCloser{
                    BaseComponent: component.BaseComponent{
                        TypeOfMine:    TypeKafka,
                        NameOfMine:    name,
                        EventCounters: make(map[string]prometheus.Counter),
                    },
                },
                producer:              producer,
                cfg:                   cfg,
                addHeaders:            addHeaders,
                addHeadersString:      addHeadersString,
                addHeadersPlaceHolder: addHeadersPlaceHolder,
            },
        },
    }
    sink.reportEventNameLabelToMetrics.Store(cfg.ReportEventNameLabelToMetrics)

    // error handling
    // Note: messages will only be returned here after all retry attempts are exhausted.
    go func(s *kafkaSink) {
        for pe := range producer.Errors() {
            if pe != nil {
                // metrics
                // 失败1
                // metrics.AddEvents(s.TypeOfMine, s.NameOfMine, 0, 0, 0, 0, 1)
                eventName := pe.Msg.Metadata.(string) // 获取原始日志名
                if s.ReportEventNameLabelToMetrics() {
                    s.AddEventMetrics(metrics.DescFailed, eventName, 1)
                } else {
                    s.AddEventMetrics(metrics.DescFailed, "", 1)
                }

                zaplog.Error("write kafka failed", zap.Error(pe.Err))
                if sink.callback != nil {
                    headers := fromKafkaHeaders(pe.Msg.Headers)
                    body := pe.Msg.Value.(sarama.ByteEncoder)
                    toCut := 0
                    if strLen, ok := headers[headersInLogLen]; ok {
                        intLen, parseErr := strconv.ParseInt(strLen, 10, 64)
                        if parseErr == nil {
                            toCut = int(intLen)
                        }
                    }
                    body = body[toCut:]
                    sink.callback.OnWriteResult(event.Event{
                        Name:    eventName,
                        Content: body,
                        Headers: headers,
                    }, pe.Err)
                }
            }
        }
    }(&sink)

    go func(s *kafkaSink) {
        for pm := range producer.Successes() {
            if pm != nil {
                // metrics
                // 成功1
                // metrics.AddEvents(s.TypeOfMine, s.NameOfMine, 0, 0, 0, 1, 0)
                if s.ReportEventNameLabelToMetrics() {
                    eventName := pm.Metadata.(string) // 获取原始日志名
                    s.AddEventMetrics(metrics.DescSucceed, eventName, 1)
                } else {
                    s.AddEventMetrics(metrics.DescSucceed, "", 1)
                }

                //nolint:staticcheck
                if sink.callback != nil {
                    sink.callback.OnWriteResult(event.Event{
                        // Name:    pm.Metadata.(string),
                        // Content: string(pm.Value.(sarama.StringEncoder)),
                        // Headers: fromKafkaHeaders(pm.Headers),
                    }, nil)
                }
            }
        }
    }(&sink)

    // update metrics to prometheus
    go metricsClient.UpdatePrometheusMetrics()

    return &sink, nil
}

func toKafkaHeaders(headers map[string]string) []sarama.RecordHeader {
    output := make([]sarama.RecordHeader, 0, len(headers))
    for k, v := range headers {
        sh := sarama.RecordHeader{Key: util.StringBytes(k), Value: util.StringBytes(v)}
        output = append(output, sh)
    }
    return output
}

func fromKafkaHeaders(headers []sarama.RecordHeader) map[string]string {
    output := make(map[string]string)
    for _, h := range headers {
        output[util.BytesString(h.Key)] = util.BytesString(h.Value)
    }
    return output
}
github-actions[bot] commented 3 months ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] commented 1 week ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.