housepower / clickhouse_sinker

Easily load data from kafka to ClickHouse
https://housepower.github.io/clickhouse_sinker
Apache License 2.0
515 stars 118 forks source link

One partition's offset reseted to earliest when restart sinker #135

Closed yuezhongren1 closed 2 years ago

yuezhongren1 commented 2 years ago
vesion:  v1.8.14 

task config  :  

 "task": {
    "name": "ad_req_sinker",
    "topic": "AdReq",
    "consumerGroup": "AdReqGroup",
    "earliest": true,
    "parser": "json",
    "autoSchema": true,
    "tableName": "ad_req_raw_local",
    "excludeColumns": [
    ],
    "flushInterval": 5,
    "bufferSize": 10000
  },
重启之前的消费者
{"level":"debug","ts":"2021-11-16T17:40:45.725+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696747007, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.012+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696763391, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.336+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696779775, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.496+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696796159, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.663+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696812543, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.820+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696828927, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:46.820+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696828927, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:46.944+0800","msg":"clickhouse_sinker got the exit signal, start to clean"}
{"level":"info","ts":"2021-11-16T17:40:46.944+0800","msg":"stopping parsing pool"}
{"level":"info","ts":"2021-11-16T17:40:46.948+0800","msg":"stopping writing pool"}
{"level":"warn","ts":"2021-11-16T17:40:47.196+0800","msg":"got a message(topic AdReq, partition 0, offset 102696861696) right to the range [102696828928, 102696861696)","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:48.197+0800","msg":"Ring.ForceBatchOrShard partition 0 message range [102696828928, 102696861696)","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:48.197+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696834856, messages 5929, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:49.505+0800","msg":"Ring.ForceBatchOrShard partition 0 message range [102696861696, 102696894464)","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:50.775+0800","msg":"Ring.ForceBatchOrShard partition 0 message range [102696894464, 102696927232)","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.958+0800","msg":"stopping timer wheel"}
{"level":"info","ts":"2021-11-16T17:40:50.958+0800","msg":"stopping task service...","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.958+0800","msg":"Kafka.Run quit due to context has been canceled","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.962+0800","msg":"stopped input","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.962+0800","msg":"stopped output","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.962+0800","msg":"stopped internal timers","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.962+0800","msg":"stopped","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:50.962+0800","msg":"clickhouse_sinker clean completed, exit"}
重启之后的消费者
{"level":"warn","ts":"2021-11-16T17:40:14.492+0800","msg":"got a message(topic AdReq, partition 5, offset 103636598784) right to the range [97595915789, 97595948557)","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:15.606+0800","msg":"Ring.ForceBatchOrShard partition 5 message range [97595915789, 103636598784)","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:15.606+0800","msg":"topic AdReq partition 5 became busy","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:16.606+0800","msg":"Ring.ForceBatchOrShard partition 5 message range [97595915789, 103636598785)","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:16.607+0800","msg":"going to flush a batch for topic AdReq patittion 5, offset 97595932671, messages 1, gaps: [{Begin:97595916289 End:97595932672}], parse errors
: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:18.770+0800","msg":"going to flush a batch for topic AdReq patittion 2, offset 103586522790, messages 9895, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:18.770+0800","msg":"going to flush a batch for topic AdReq patittion 10, offset 46876321647, messages 10096, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:39.988+0800","msg":"got a message(topic AdReq, partition 10, offset 46876196864) left to 46876321648","task":"ad_req_sinker"}

{"level":"debug","ts":"2021-11-16T17:40:51.455+0800","msg":"going to flush a batch for topic AdReq patittion 10, offset 46876704767, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:52.815+0800","msg":"going to flush a batch for topic AdReq patittion 5, offset 103637123071, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:53.383+0800","msg":"going to flush a batch for topic AdReq patittion 10, offset 46876721151, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:53.503+0800","msg":"got a message(topic AdReq, partition 0, offset 102696402944) right to the range [97589937790, 97589970558)","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:54.503+0800","msg":"Ring.ForceBatchOrShard partition 0 message range [97589937790, 102696402944)","task":"ad_req_sinker"}
{"level":"info","ts":"2021-11-16T17:40:54.504+0800","msg":"topic AdReq partition 0 became busy","task":"ad_req_sinker"}
{"level":"warn","ts":"2021-11-16T17:40:55.635+0800","msg":"Ring.ForceBatchOrShard partition 0 message range [97589937790, 102696402945)","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:55.636+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 97589985279, messages 1, gaps: [{Begin:97589968897 End:97589985280}], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:57.819+0800","msg":"going to flush a batch for topic AdReq patittion 5, offset 103637128205, messages 5134, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:40:57.819+0800","msg":"going to flush a batch for topic AdReq patittion 10, offset 46876721554, messages 403, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:41:15.191+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696419327, messages 16383, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:41:15.355+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696435711, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:41:15.530+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696452095, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}
{"level":"debug","ts":"2021-11-16T17:41:15.677+0800","msg":"going to flush a batch for topic AdReq patittion 0, offset 102696468479, messages 16384, gaps: [], parse errors: 0","task":"ad_req_sinker"}

Partition 0 lag


clickhouse_sinker@BJ-10-222-0-129 (github.com/segmentio/kafka-go)
AdReqGroup      AdReq  0          102933495808    103716694853    783199045       clickhouse_sinker@BJ-10-222-0-129 (github.com/segmentio/kafka-go)-b005edfc-759f-4d8b-a1af-9e13e4a63dd5 /10.222.0.129   clickhouse_sinker@BJ-10-222-0-129 (github.com/segmentio/kafka-go)
                                                      `
yuzhichang commented 2 years ago

What's the Kafka version? Can you try the latest release v2.2.0?

yuezhongren1 commented 2 years ago

What's the Kafka version? Can you try the latest release v2.2.0?

kafka version 2.5
I am testing v2.2.0 right now.

yuzhichang commented 2 years ago

master tip 7a23ba3 changed default kafka client from sarama to franz. Franz is generally better than saram. Please have a try.

yuezhongren1 commented 2 years ago

master tip 7a23ba3 changed default kafka client from sarama to franz. Franz is generally better than saram. Please have a try.

v1.8.14 default kafka client is kafka-go. Haven't try franz kafka client yet. I am testing v2.2.0 with saram client now, if its goes well. I will update to V2.2.0