Closed suxiangdong closed 10 months ago
When the client is restarted back to normal, the logs look like this:
[DEBUG] in CommitOffsetsSync; group: big-data, with: map[ta-logbus-v1:map[0:{-1 38849}]] [DEBUG] issuing commit; group: big-data, uncommitted: map[ta-logbus-v1:map[0:{-1 38849}]]
Why does the epoch value for partition2 change from 0 to -1 again?
Can you upgrade to v1.14.1 and see if this issue persists?
After upgrading to v1.14.1, the problem persists.
[DEBUG] handled epoch results; broker: 103, using: map[], reloading: map[ta-logbus-v1:map[8:{0 0} 11:{0 0}]]
Do you know where in the logs the consumer stops consuming?
Also can you use kcl, v0.12.0 because I think the latest is broken:
kcl misc offset-for-leader-epoch ta-logbus-v1:8,11 -c 0 -e 0
and see what that says? The consumer logs show OffsetForLeaderEpoch being perpetually retried, but there is no log message for why which means that it's getting a retryable error.
Do you know where in the logs the consumer stops consuming?
It should be at line 13734 of consumer_2.log, after which partition8 is no longer consumed. After a reboot, the epoch obtained for partition8 is -1, back to normal.
Also can you use kcl, v0.12.0 because I think the latest is broken:
kcl misc offset-for-leader-epoch ta-logbus-v1:8,11 -c 0 -e 0
and see what that says? The consumer logs show OffsetForLeaderEpoch being perpetually retried, but there is no log message for why which means that it's getting a retryable error.
Use kcl after the partition stops consuming? In that case, I need to reproduce the problem.
No, that can be ran at any time. The logs show that OffsetForLeaderEpoch starts looping on those partitions at that epoch.
An epoch of -1 is not normal -- it is only -1 until the first record is consumed. After that, the epoch is the epoch attached to the record. Does your client always set Epoch of 0 rather than using the Record.LeaderEpoch?
No, that can be ran at any time. The logs show that OffsetForLeaderEpoch starts looping on those partitions at that epoch.
Can kcl skip security verification like kgo's InsecureSkipVerify setting? I didn't find a configuration item in kcl myconfig.
An epoch of -1 is not normal -- it is only -1 until the first record is consumed. After that, the epoch is the epoch attached to the record. Does your client always set Epoch of 0 rather than using the Record.LeaderEpoch?
I didn't specify the epoch in the producer
str := `{"#account_id":"%s","#type":"track","#event_name":"0"}`
a := 0
for {
for i := 0; i < 100; i++ {
strr := fmt.Sprintf(str, RandStringBytesMaskImprSrc(32))
record := &kgo.Record{
Topic: "ta-logbus-v1", Value: []byte(strr),
}
cl.Produce(ctx, record, func(_ *kgo.Record, err error) {
if err != nil {
fmt.Printf("record had a produce error: %v\n", err)
}
})
}
a += 100
fmt.Printf("##### 共计写入 %d 条\n", a)
time.Sleep(time.Second * 2)
}
Not yet, I do plan on patching today: https://github.com/twmb/kcl/pull/32
How are you committing? You don't need to specify an epoch while producing.
(this issue sounds similar to https://github.com/twmb/franz-go/issues/107)
I didn't find a way to configure tls. The sdk demo provided by Aliyun also uses "InsecureSkipVerify".
I get errors with the following configuration:
invalid large response size 352518912 > limit 104857600; the first three bytes received appear to be a tls alert record for TLS v1.2; is this a plaintext connection speaking to a tls endpoint?
configuration:
seed_brokers = ["alikafka-post-cn-uqm3awq7c001-1.alikafka.aliyuncs.com:9093", "alikafka-post-cn-uqm3awq7c001-2.alikafka.aliyuncs.com:9093", "alikafka-post-cn-uqm3awq7c001-3.alikafka.aliyuncs.com:9093"] [sasl] method = "plain" user = "xxxx" pass = "xxxxx"
How are you committing? You don't need to specify an epoch while producing.
(this issue sounds similar to #107)
When committing the offset I specified leaderEpoch, using the value obtained from within the pulled message [record.LeaderEpoch]
I'll release a new kcl later today that fixes current breakage and allows insecure skip verify, then we can continue debugging (probably tomorrow your time). I'll post here once it's released (and then we can use the offset-for-leader-epoch command).
I did the test(aotuCommit) with confluent-kafka-go (v1.9.2) and there is no such issue. Problem persists in auto-commit mode with kgo. https://github.com/confluentinc/confluent-kafka-go/tree/v1.9.2
func main() {
cfg := &kafka.ConfigMap{
"bootstrap.servers": "alikafka-post-cn-uqm3awq7c001-1.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3awq7c001-2.alikafka.aliyuncs.com:9093,alikafka-post-cn-uqm3awq7c001-3.alikafka.aliyuncs.com:9093",
"security.protocol": "sasl_ssl",
"group.id": "big-data",
"sasl.username": "xxxxxx",
"sasl.password": "xxxxxx",
"sasl.mechanism": "PLAIN",
"ssl.ca.location": "../conf/mix-4096-ca-cert",
}
consumer, err := kafka.NewConsumer(cfg)
if err != nil {
fmt.Println(err)
}
defer func() {
_ = consumer.Close()
}()
if err = consumer.SubscribeTopics([]string{"ta-logbus-v1"}, nil); err != nil {
fmt.Println(err)
}
fmt.Println("ready to get message")
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Println(string(msg.Value))
continue
}
fmt.Println(err)
}
}
I'll release a new kcl later today that fixes current breakage and allows insecure skip verify, then we can continue debugging (probably tomorrow your time). I'll post here once it's released (and then we can use the offset-for-leader-epoch command).
I skipped the security verification locally and executed the command with the following output
Sorry for the delay on releasing, got busy. I might have time tonight or tomorrow morning. The error in the screenshot you sent doesn't make much sense, though -- the client internally retries and remaps the request to the correct broker, so it shouldn't be erroring like that. Let me release and then re-read this thread and we'll see if I can come up with an idea.
As a bit of a heads up on the delay -- I've had a few short weeks due to weekend PTO's and right now work is extremely busy -- and I wanted to make sure v1.14.x is baked a little bit (i.e. I just pushed another patch today) before tagging kcl. I do plan to get to this next week (but I've said that a few times), but again I'm still sorry for the delay. I hope you're open to continuing to investigate when kcl is released and I have time to revisit this.
I went and released kcl v0.13.0, but it turns out the bug that I was worried about didn't affect the command you were running.
If you're interested: what's the status of this issue -- is it still present? If so, can you re-run the offset-for-leader-epoch command, but use --log-level debug
? The NOT_LEADER_FOR_PARTITION errors you were running into should be internally handled by the franz-go library, so they shouldn't be printed. I wonder if AliCloud isn't supporting the request properly.
Sorry to get back to you so late, I've been a bit busy lately.
I redeployed the AliCloud kafka queue service and after the upgrade all partitions are not consumed. I get the same NOT_LEADER_FOR_PARTITION error when just creating the service.
kcl v0.13.0
command:
go run main.go misc --log-level debug offset-for-leader-epoch ta-logbus-v1 -c 0 -e 0
[DEBUG] opening connection to broker; addr: alikafka-post-cn-wwo3c8kcx00i-1.alikafka.aliyuncs.com:9093, broker: seed_0
[DEBUG] connection opened to broker; addr: alikafka-post-cn-wwo3c8kcx00i-1.alikafka.aliyuncs.com:9093, broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 111.5µs, time_to_write: 93.792µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 14, read_wait: 93.291µs, time_to_read: 128.705125ms, err: <nil>
[DEBUG] broker does not know our ApiVersions version, downgrading to version 0 and retrying; broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 0
[DEBUG] wrote ApiVersions v0; broker: seed_0, bytes_written: 17, write_wait: 26.666µs, time_to_write: 54.959µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: seed_0, bytes_read: 278, read_wait: 32.041µs, time_to_read: 137.406375ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed_0
[DEBUG] wrote SASLHandshake v1; broker: seed_0, bytes_written: 24, write_wait: 24.458µs, time_to_write: 45.708µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed_0, bytes_read: 21, read_wait: 16.167µs, time_to_read: 123.474792ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed_0, addr: alikafka-post-cn-wwo3c8kcx00i-1.alikafka.aliyuncs.com:9093, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed_0, version: 1, step: 0
[DEBUG] wrote SASLAuthenticate v1; broker: seed_0, bytes_written: 84, write_wait: 98.5µs, time_to_write: 58.708µs, err: <nil>
[DEBUG] read SASLAuthenticate v1; broker: seed_0, bytes_read: 24, read_wait: 38.625µs, time_to_read: 666.445084ms, err: <nil>
[DEBUG] connection initialized successfully; addr: alikafka-post-cn-wwo3c8kcx00i-1.alikafka.aliyuncs.com:9093, broker: seed_0
[DEBUG] wrote Metadata v7; broker: seed_0, bytes_written: 36, write_wait: 1.507820334s, time_to_write: 33.583µs, err: <nil>
[DEBUG] read Metadata v7; broker: seed_0, bytes_read: 566, read_wait: 34.958µs, time_to_read: 118.887584ms, err: <nil>
[DEBUG] opening connection to broker; addr: 39.98.206.235:9093, broker: 101
[DEBUG] connection opened to broker; addr: 39.98.206.235:9093, broker: 101
[DEBUG] issuing api versions request; broker: 101, version: 3
[DEBUG] wrote ApiVersions v3; broker: 101, bytes_written: 31, write_wait: 19.625µs, time_to_write: 28.5µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 101, bytes_read: 14, read_wait: 25.5µs, time_to_read: 314.274416ms, err: <nil>
[DEBUG] broker does not know our ApiVersions version, downgrading to version 0 and retrying; broker: 101
[DEBUG] issuing api versions request; broker: 101, version: 0
[DEBUG] wrote ApiVersions v0; broker: 101, bytes_written: 17, write_wait: 14.125µs, time_to_write: 37.833µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: 101, bytes_read: 278, read_wait: 15.334µs, time_to_read: 71.517541ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 101
[DEBUG] wrote SASLHandshake v1; broker: 101, bytes_written: 24, write_wait: 11.292µs, time_to_write: 33.083µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 101, bytes_read: 21, read_wait: 14.375µs, time_to_read: 71.034917ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 101, addr: 39.98.206.235:9093, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 101, version: 1, step: 0
[DEBUG] wrote SASLAuthenticate v1; broker: 101, bytes_written: 84, write_wait: 11.083µs, time_to_write: 56.542µs, err: <nil>
[DEBUG] read SASLAuthenticate v1; broker: 101, bytes_read: 24, read_wait: 39.708µs, time_to_read: 79.463083ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 39.98.206.235:9093, broker: 101
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 1.131981792s, time_to_write: 44.625µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 67.75µs, time_to_read: 77.825291ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101 103 102]
[DEBUG] opening connection to broker; addr: 39.98.197.32:9093, broker: 102
[DEBUG] opening connection to broker; addr: 39.98.217.201:9093, broker: 103
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 67.625µs, time_to_write: 75.875µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 39.208µs, time_to_read: 75.442042ms, err: <nil>
[DEBUG] connection opened to broker; addr: 39.98.197.32:9093, broker: 102
[DEBUG] issuing api versions request; broker: 102, version: 3
[DEBUG] wrote ApiVersions v3; broker: 102, bytes_written: 31, write_wait: 27.333µs, time_to_write: 50.959µs, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 1.518229459s, tries: 0, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 39.25µs, time_to_write: 54.042µs, err: <nil>
[DEBUG] connection opened to broker; addr: 39.98.217.201:9093, broker: 103
[DEBUG] issuing api versions request; broker: 103, version: 3
[DEBUG] wrote ApiVersions v3; broker: 103, bytes_written: 31, write_wait: 25.125µs, time_to_write: 49.292µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 102, bytes_read: 14, read_wait: 44.791µs, time_to_read: 74.200834ms, err: <nil>
[DEBUG] broker does not know our ApiVersions version, downgrading to version 0 and retrying; broker: 102
[DEBUG] issuing api versions request; broker: 102, version: 0
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 44.75µs, time_to_read: 73.236166ms, err: <nil>
[DEBUG] wrote ApiVersions v0; broker: 102, bytes_written: 17, write_wait: 12.167µs, time_to_write: 33.167µs, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 27.291µs, time_to_write: 16.75µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 103, bytes_read: 14, read_wait: 41µs, time_to_read: 79.410625ms, err: <nil>
[DEBUG] broker does not know our ApiVersions version, downgrading to version 0 and retrying; broker: 103
[DEBUG] issuing api versions request; broker: 103, version: 0
[DEBUG] wrote ApiVersions v0; broker: 103, bytes_written: 17, write_wait: 10.375µs, time_to_write: 28.25µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: 102, bytes_read: 278, read_wait: 17.291µs, time_to_read: 80.525ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 102
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 27.459µs, time_to_read: 80.462875ms, err: <nil>
[DEBUG] wrote SASLHandshake v1; broker: 102, bytes_written: 24, write_wait: 9.417µs, time_to_write: 34.041µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: 103, bytes_read: 278, read_wait: 14.792µs, time_to_read: 77.5235ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 103
[DEBUG] wrote SASLHandshake v1; broker: 103, bytes_written: 24, write_wait: 22.667µs, time_to_write: 36.833µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 102, bytes_read: 21, read_wait: 16.334µs, time_to_read: 79.483208ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 102, addr: 39.98.197.32:9093, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 102, version: 1, step: 0
[DEBUG] wrote SASLAuthenticate v1; broker: 102, bytes_written: 84, write_wait: 21.209µs, time_to_write: 38.791µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 103, bytes_read: 21, read_wait: 17.458µs, time_to_read: 73.850084ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 103, addr: 39.98.217.201:9093, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 103, version: 1, step: 0
[DEBUG] wrote SASLAuthenticate v1; broker: 103, bytes_written: 84, write_wait: 3.333µs, time_to_write: 19.042µs, err: <nil>
[DEBUG] read SASLAuthenticate v1; broker: 102, bytes_read: 24, read_wait: 19.084µs, time_to_read: 79.389291ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 39.98.197.32:9093, broker: 102
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 102, bytes_written: 87, write_wait: 621.483333ms, time_to_write: 28.042µs, err: <nil>
[DEBUG] read SASLAuthenticate v1; broker: 103, bytes_read: 24, read_wait: 8.041µs, time_to_read: 81.5015ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 39.98.217.201:9093, broker: 103
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 103, bytes_written: 87, write_wait: 636.731166ms, time_to_write: 45.459µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 103, bytes_read: 106, read_wait: 28.25µs, time_to_read: 81.96025ms, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 102, bytes_read: 106, read_wait: 24.417µs, time_to_read: 97.369666ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 2.159696625s, tries: 1, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 112.5µs, time_to_write: 220.958µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 62.167µs, time_to_read: 76.267542ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 23.584µs, time_to_write: 22.791µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 70.75µs, time_to_read: 75.991417ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 3.428988292s, tries: 2, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 62.75µs, time_to_write: 98.167µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 72.208µs, time_to_read: 88.343209ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 28.708µs, time_to_write: 32.917µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 35.625µs, time_to_read: 82.457792ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 5.778123084s, tries: 3, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 102.417µs, time_to_write: 119.667µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 58.583µs, time_to_read: 80.899625ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 33.416µs, time_to_write: 40.125µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 29.084µs, time_to_read: 78.576666ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 8.439405042s, tries: 4, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 31.333µs, time_to_write: 59.958µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 29.292µs, time_to_read: 85.943167ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 33.5µs, time_to_write: 33.375µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 29.209µs, time_to_read: 71.675791ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 11.098670459s, tries: 5, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 38.583µs, time_to_write: 70.667µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 38µs, time_to_read: 81.711416ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 26.417µs, time_to_write: 32.458µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 25.167µs, time_to_read: 75.1305ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 13.757054667s, tries: 6, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 66.792µs, time_to_write: 125.583µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 60.834µs, time_to_read: 217.322625ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 50.5µs, time_to_write: 110.792µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 79.75µs, time_to_read: 80.257041ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 16.556839834s, tries: 7, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 53.5µs, time_to_write: 130.209µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 961.583µs, time_to_read: 84.351583ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 32.833µs, time_to_write: 48.75µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 24.708µs, time_to_read: 76.446959ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 19.219260375s, tries: 8, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 40.875µs, time_to_write: 54.833µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 39.25µs, time_to_read: 369.669833ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 46.125µs, time_to_write: 50.875µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 49.167µs, time_to_read: 155.16225ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 22.2459085s, tries: 9, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 40.375µs, time_to_write: 72.916µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 32µs, time_to_read: 171.425459ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 46.666µs, time_to_write: 35.084µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 32.5µs, time_to_read: 82.61625ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 25.001185709s, tries: 10, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 101, bytes_written: 36, write_wait: 57.834µs, time_to_write: 112.958µs, err: <nil>
[DEBUG] read Metadata v7; broker: 101, bytes_read: 566, read_wait: 44.542µs, time_to_read: 76.233458ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 26.75µs, time_to_write: 32.667µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 92.833µs, time_to_read: 73.960583ms, err: <nil>
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetForLeaderEpoch, time_since_start: 27.65236s, tries: 11, err: NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
[DEBUG] wrote Metadata v7; broker: 103, bytes_written: 36, write_wait: 61.334µs, time_to_write: 86.375µs, err: <nil>
[DEBUG] read Metadata v7; broker: 103, bytes_read: 566, read_wait: 55.458µs, time_to_read: 103.060083ms, err: <nil>
[DEBUG] sharded request; req: OffsetForLeaderEpoch, destinations: [101]
[DEBUG] wrote OffsetForLeaderEpoch v2; broker: 101, bytes_written: 87, write_wait: 38.334µs, time_to_write: 37.5µs, err: <nil>
[DEBUG] read OffsetForLeaderEpoch v2; broker: 101, bytes_read: 106, read_wait: 25.958µs, time_to_read: 249.832625ms, err: <nil>
unable to issue request to broker 101 (39.98.206.235:9093): NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
BROKER TOPIC PARTITION LEADER EPOCH END OFFSET ERROR
102 ta-logbus-v1 1 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
102 ta-logbus-v1 4 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
102 ta-logbus-v1 7 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
102 ta-logbus-v1 10 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
103 ta-logbus-v1 2 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
103 ta-logbus-v1 5 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
103 ta-logbus-v1 8 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
103 ta-logbus-v1 11 -1 -1 NOT_LEADER_FOR_PARTITION: This server is not the leader for that topic-partition.
Ok that's a bit weird -- it looks like AliCloud is advertising that OffsetForLeaderEpoch is supported, but then is not actually handling it. We can see that the client is splitting the incoming request and sending it to all relevant brokers, but then the relevant broker is continually replying "not leader" -- even though we know we are issuing the request to the proper leader.
This might also be why you were experiencing issues originally -- if the client under the hood was trying to verify the leader epoch, then the partition would never be consumed and then lag would continually grow. This should actually persist across restarts... 🤔
For the confluent code, if you enable request logging, does confluent's library issue OffsetForLeaderEpoch? I remember them not supporting it for a while.
We can also try disabling OffsetForLeaderEpoch -- does this work if you use kgo.MaxVersions(kversion.V2_0_0())
?
Consuming with the kgo.MaxVersions(kversion.V2_0_0())
option, the problem persists.
Can you enabled debug for "consumer,cgrp" for the Confluent cgo driver and paste the logs, if you're able?
I looked into this a bit more, and I have a suspicion about the librdkafka code: Truncation detection (KIP-320, using OffsetForLeaderEpoch) was introduced in librdkafka v2.1.0, seen in the changelog here (introducing pr here).
In the logs you posted, I see this line:
%7|1692683976.380|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.9.2 (0x10902ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0xfffff)
i.e. your confluent-kafka-go is using librdkafka v1.9.2, which does not have support for the feature that is causing problems in franz-go.
If you're open, can you upgrade librdkafka to v2.1.0 or greater and see if it is still able to work successfully?
And if it does continue to work -- can you post the same debug logs? The v2.1.0+ prints the leader epoch in the commits, so I want to see if they're -1 or not.
Also, can you post the debug logs of franz-go when you used kversion.MaxVersions(kversion.V2_0_0())
? That should have had a different bit of logs because we don't use OffsetForLeaderEpoch in that case.
It also works with librdkafka
version v2.2.0
.
f1.log
f2.log
franz-go v1.14.4
& kgo.MaxVersions(kversion.V2_0_0())
franz-go-1.log
franz-go-2.log
Are you still seeing the partitions be completely stuck with franz-go?
Looking at the logs for both librdkafka and franz-go, it looks like AliCloud has a bunch of partition unavailability and moves partitions around brokers a lot, but they always seem to eventually come back.
For example, in the franz-go-1.log logs, ta-logbus-v2 partition 10 is outright not available for a long time. There's also a period where partitions 2, 5, 8, and 11 are all unavailable:
LEADER_NOT_AVAILABLE{ta-logbus-v2[2 5 8 11]}
But towards the end of these logs, it looks like all partitions become available again and there are no fetch errors. It looks like fetches stop being polled. We can see logs where fetches are read (and then buffered) into the client,
3806 [DEBUG] read Fetch v8; broker: 101, bytes_read: 71552, read_wait: 97.417µs, time_to_read: 66.686125ms, err: <nil>
3807 [DEBUG] read Fetch v8; broker: 103, bytes_read: 158909, read_wait: 89.041µs, time_to_read: 91.191625ms, err: <nil>
3808 [DEBUG] read Fetch v8; broker: 102, bytes_read: 346718, read_wait: 86.25µs, time_to_read: 858.302625ms, err: <nil>
3809 [DEBUG] autocommitting; group: test-big-data
...
3816 [DEBUG] autocommitting; group: test-big-data
3817 [DEBUG] reaped connections; time_since_last_reap: 19.999746333s, reap_dur: 867.833µs, num_reaped: 4
3818 [DEBUG] autocommitting; group: test-big-data
...
but there is never any more "wrote Fetch" logs. What does your consumer code look like? It looks like the PollFetches function isn't being called anymore.
In fact, it looks like your consumer pipeline stopped a lot earlier, on line 3432 (in franz-logs-2). Every time PollFetches is called, if it returns data, it logs updated uncommitted
. The last time this is printed is on line 3432. After this, the only fetches that are sent out are to broker 102 -- so it looks like brokers 101 and 102 just don't have data for the client for a long time. Then partitions 2, 5, 8, and 11 are moved to broker 103 in a metadata response, so 103 starts getting fetch requests as well. Then it looks like brokers 101, 102, and 103 actually do return some data, so fetch requests stop being issued -- but the data is never drained in your application code via a Poll.
Yes, using franz-go still gets completely stuck and can reproduce the problem consistently.
As you can see on the console, consumers within the test-big-data
group lost connectivity. After getting stuck and waiting a bit longer, the consumer still could not rejoin the group.
Got a log output after a long wait.After re-running it, it works fine again.
fetch err topic partition 0: unable to join group session: unable to dial: dial tcp 47.92.121.137:9093: connect: connection refused ,exit status 1
I used autocommit
for consumption and added the kgo.MaxVersions(kversion.V2_0_0())
parameter.
I tried removing kgo.BlockRebalanceOnPoll
and kgo.RequireStableFetchOffsets
(Ali console shows kafka major version as 2.2.0
), and the problem persists.
package main
import (
"context"
"crypto/tls"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kversion"
"github.com/twmb/franz-go/pkg/sasl/plain"
"gopkg.in/natefinch/lumberjack.v2"
"os"
"os/signal"
"strings"
)
var (
seedBrokers = "alikafka-post-cn-4xl3cykvp00b-1.alikafka.aliyuncs.com:9093,alikafka-post-cn-4xl3cykvp00b-2.alikafka.aliyuncs.com:9093,alikafka-post-cn-4xl3cykvp00b-3.alikafka.aliyuncs.com:9093"
topic = "ta-logbus-v2"
group = "test-big-data"
)
func die(msg string, args ...any) {
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}
func main() {
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(seedBrokers, ",")...),
kgo.ConsumerGroup(group),
kgo.ConsumeTopics(topic),
kgo.SASL(plain.Auth{
User: "User_xxx",
Pass: "Pass_xxx",
}.AsMechanism()),
kgo.DialTLSConfig(&tls.Config{InsecureSkipVerify: true}),
kgo.BlockRebalanceOnPoll(),
kgo.RequireStableFetchOffsets(),
kgo.MaxVersions(kversion.V2_0_0()),
}
ft := &lumberjack.Logger{
MaxAge: 0,
MaxSize: 1000,
Filename: "./log/franz-go-2.log",
MaxBackups: 0,
}
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(ft, kgo.LogLevelDebug, nil)))
cl, err := kgo.NewClient(opts...)
if err != nil {
die("unable to create client: %v", err)
}
go consume(cl)
sigs := make(chan os.Signal, 2)
signal.Notify(sigs, os.Interrupt)
<-sigs
fmt.Println("received interrupt signal; closing client")
done := make(chan struct{})
go func() {
defer close(done)
cl.Close()
}()
select {
case <-sigs:
fmt.Println("received second interrupt signal; quitting without waiting for graceful close")
case <-done:
}
}
func consume(cl *kgo.Client) {
fmt.Println("ready to get message")
for {
fetches := cl.PollFetches(context.Background())
if fetches.IsClientClosed() {
return
}
fetches.EachError(func(t string, p int32, err error) {
die("fetch err topic %s partition %d: %v", t, p, err)
})
var seen int
fetches.EachRecord(func(*kgo.Record) {
seen++
})
fmt.Printf("processed %d records--autocommitting now allows the **prior** poll to be available for committing, nothing can be lost!\n", seen)
}
}
How long are you leaving consumer running to verify that no progress is being made? I wonder -- is there anything I can do to recreate this myself? I wonder if I could point this client at AliCloud somehow.
About an hour.
This service is for test use only, no production data is involved. If it's convenient for you, I can provide the account password for the kafka service, perhaps via telegram?When you start the kafka client, I'll operate an upgrade in the console and it will be reproducible.
Sorry, I went on vacation after that last message (in fact I have two days left).
What do you think about using age, and sending me a private message on the discord channel in this repo?
My age public key is age10kg5977p4ks3t5axalmmc3w8e375vc5463qhu8lqpkvzh35swq8sewag09.
Have fun.
I'll look into how age works.
I have encrypted the file using age
.
Sorry, I don't know how to send it to you via discord channel in this repo
(is discord software?).
Yes -- are you able to access this? https://discord.gg/K4R5c8zsMS
Alternatively, you can post it here (since it's encrypted it should be of low concern), but somebody can see the encrypted bit for a bit. Once I download, I can delete your comment.
Yes -- are you able to access this? https://discord.gg/K4R5c8zsMS
I've joined the discord channel.
Thanks! So, I downloaded the credentials and I'm not running into the problem you seem to be. I'll leave the consumer running for a while but so far, things seem fine. I had to add an AllowRebalance
-- but after that, everything is working. Here's the output of describing the group that I'm seeing with rpk
(after a few modifications to support plaintext / insecure skip verify / bumping command timeouts):
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG MEMBER-ID CLIENT-ID HOST
ta-logbus-v2 0 420460 420460 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 1 483134 483134 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 2 497481 497481 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 3 402040 402040 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 4 457800 457800 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 5 462109 462109 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 6 438094 438094 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 7 441000 441000 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 8 448713 448713 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 9 487454 487454 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 10 498078 498078 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
ta-logbus-v2 11 469560 469560 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
Thanks! So, I downloaded the credentials and I'm not running into the problem you seem to be. I'll leave the consumer running for a while but so far, things seem fine. I had to add an
AllowRebalance
-- but after that, everything is working. Here's the output of describing the group that I'm seeing withrpk
(after a few modifications to support plaintext / insecure skip verify / bumping command timeouts):TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG MEMBER-ID CLIENT-ID HOST ta-logbus-v2 0 420460 420460 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 1 483134 483134 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 2 497481 497481 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 3 402040 402040 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 4 457800 457800 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 5 462109 462109 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 6 438094 438094 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 7 441000 441000 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 8 448713 448713 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 9 487454 487454 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 10 498078 498078 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61 ta-logbus-v2 11 469560 469560 0 kgo-df89b983-106e-4601-b6ea-965d70506dfe kgo /98.245.251.61
Another operation is needed to reproduce it.
When you are in the process of normal consumption, I can reproduce the issue by operating the upgrade in the AliCloud console. This may require us to be online at the same time.
My time here is UTC+8. You keep the client consuming when you have time, I'll go and operate the upgrade in the AliCloud console and reproduce the issue. Or we determine the time on discord?
thx.
I think I'll have time early next week (maybe Monday?), apologies on the delay. I'm UTC-6 so tz interaction might be interesting. Looks like my 8am is your 10pm, that might be a candidate (or if you're up later).
edit: actually Wednesday at my 9am is the first free time, your 11pm.
I am so sorry, we took a long holiday. Would you see if next Wednesday(2023.10.11) is OK? If so, I'll be waiting for you from 10pm-11pm.
No worries, I also had a long holiday. I'm just glad you're open to continuing on debugging this intermittently :). I'm not free this Wednesday, but I am on Thursday at my 8:30am, your 10:30pm. Are you free then?
Okay, I'll see you on Thursday night at 10:30pm.
I looked into this a bit more, and I have a suspicion about the librdkafka code: Truncation detection (KIP-320, using OffsetForLeaderEpoch) was introduced in librdkafka v2.1.0, seen in the changelog here (introducing pr here).
In the logs you posted, I see this line:
%7|1692683976.380|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.9.2 (0x10902ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC, debug 0xfffff)
And if it does continue to work -- can you post the same debug logs? The v2.1.0+ prints the leader epoch in the commits, so I want to see if they're -1 or not.
Also, can you post the debug logs of franz-go when you used
kversion.MaxVersions(kversion.V2_0_0())
? That should have had a different bit of logs because we don't use OffsetForLeaderEpoch in that case.
You are right, it was my fault. I ignored client.AllowRebalance()
while adding kgo.MaxVersions(kversion.V2_0_0())
.
After adding kgo.MaxVersions(kversion.V2_0_0())
, everything works fine. I don't know what the reason is, can you help me with this?
I see that there are a lot of versions, from V0_8_0
to V3_3_0
, but I don't know which version should be used in what scenario. Is there any documentation where I can get information about kversion
?
i.e. your confluent-kafka-go is using librdkafka v1.9.2, which does not have support for the feature that is causing problems in franz-go.
If you're open, can you upgrade librdkafka to v2.1.0 or greater and see if it is still able to work successfully?
librdkafka v2.2.0 is also working fine.
You shouldn't need to add MaxVersions
-- this should work without it. Trying max 2.0.0 was to try to disable offset for leader epoch parsing.
Did the bug crop back up even without setting MaxVersions?
Yes, without kgo.MaxVersions(kversion.V2_0_0())
the problem can be reproduced steadily.
Interesting. Want to debug this on Discord again?
@twmb I'm experiencing almost the exact same thing, albeit with WarpStream not alibaba cloud. I can sometimes (although not deterministically) induce a member of the consumer group to stop consuming some partitions almost entirely. I see occassional fetch requests from the consumer still, but they're very infrequent and its not nearly enough to keep up with traffic.
I'm trying the kgo.MaxVersions(kversion.V2_0_0())
thing to see if that makes a difference.
Is there anything else I can provide for more context? Just debug logs from the client?
@richardartoul debug logs, leading up to and including past the hang. But, no need to pin to MaxVersions.
@twmb Any way we can chat in discord or slack?
I strongly suspect its the same issue because warpstream has similar behavior the alibaba kafka setup with partitions being "moved around" leaders a lot. Let me see if i can get debug logs enabled and repro also.
Scene: Dependency on franz-go v1.10.0 AliCloud kafka service: 3 brokers, 2 consumers, 12 partitions
When I upvote the disk, partitions [2, 5, 8, 11] stop consuming, others are normal. But when I restarted the consumers, they all came back to normal again. I found some keywords in debug mode: "map[ta-logbus-v1:map[2:{-1 23817}]]" , "map[ta-logbus-v1:map[2:{23817 e0 ce0} 5:{ 23946 e0 ce0}]]". But I can't understand is why after reboot it goes back to normal.
describe group:
Consumption Partitioning: 0 - 5 consumer1_debug.log Consumption Partitioning: 6 - 11 consumer2_debug.log