twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Support force using ListOffsetsRequest to fetch offset #699

Closed Max-Cheng closed 1 month ago

Max-Cheng commented 3 months ago

Why have this proposal

Some cloud service providers (such as alibabacloud) haven't implemented OffsetsForLeaderEpoch.

What happened

Franz-go uses OffsetsForLeaderEpoch to fetch leader epoch/End Offset and compare consumer offset between leader. After restart, will trigger reset offset behaviour.

Log

[root@xxxx dig]# ./kcl_linux_amd64 misc list-offsets TARGET-TOPIC
BROKER  TOPIC                  PARTITION  START  END   ERROR
101     TARGET-TOPIC  0          170    270
102     TARGET-TOPIC  1          100    100
103     TARGET-TOPIC  2          100    100
101     TARGET-TOPIC  3          100    100
102     TARGET-TOPIC  4          36     100
103     TARGET-TOPIC  5          0      85
[root@xxxx dig]# ./kcl_linux_amd64 misc offset-for-leader-epoch TARGET-TOPIC
BROKER  TOPIC                  PARTITION  LEADER EPOCH  END OFFSET  ERROR
101     TARGET-TOPIC  0          0             0
101     TARGET-TOPIC  3          0             0
102     TARGET-TOPIC  1          0             0
102     TARGET-TOPIC  4          0             0
103     TARGET-TOPIC  2          0             0
103     TARGET-TOPIC  5          0             0
[root@xxxx dig]# ./kcl_linux_amd64 group describe -v xxxx
GROUP        xxxx
COORDINATOR  103
STATE        Stable
BALANCER     cooperative-sticky
MEMBERS      3
TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   MEMBER-ID                                 CLIENT-ID  HOST
TARGET-TOPIC  0          270             270             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  1          100             100             0     kgo-xxxx  kgo        /xxxxx
TARGET-TOPIC  2          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  3          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  4          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  5          85              85              0     kgo-xxxx  kgo        /xxxxx
Max-Cheng commented 3 months ago

The main issue is that method kgo.Client.loadEpochsForBrokerLoad() return the broker end-offset to 0,thus client reset offset into the FAKE end-offset, cause re-consume message from kafka

Max-Cheng commented 3 months ago

draft:https://github.com/twmb/franz-go/pull/700

Max-Cheng commented 3 months ago

The main issue is that method kgo.Client.loadEpochsForBrokerLoad() return the broker end-offset to 0,thus client reset offset into the FAKE end-offset, cause re-consume message from kafka

Update: using param to force supportsOffsetForLeaderEpoch return false to skip endoffset check.

twmb commented 3 months ago

Why is the provider returning the ApiKey for OffsetsForLeaderEpoch in the ApiVersions response? There shouldn't need to be an escape hatch here.

Max-Cheng commented 3 months ago

Why is the provider returning the ApiKey for OffsetsForLeaderEpoch in the ApiVersions response? There shouldn't need to be an escape hatch here.

Yes, I am also extremely puzzled as to why Alibaba would construct an ApiVersions response with a fake value. However, from the logs, I found that the reason for resetting was because the endoffset was 0, which led to the consumer consuming repeatedly (auto.reset.offset). At the same time, I also confirmed with their technical support to understand whether this interface was working correctly internally. But the response I received was that they do not support this interface.

twmb commented 2 months ago

Can you file a ticket with Alibaba with evidence they are supporting the API?

Also, I think we can avoid your feature for now -- what do you think about using a kversions.Versions that specifically pins OffsetForLeaderEpoch to a max version of -1?

Max-Cheng commented 2 months ago

This ticket was communicated through Alibaba Cloud's DingTalk, involving our client. I am unable to show you the complete content of the ticket. image

Meeting Content: The issue arises from the fact that the "cloud storage type" topic in Kafka does not support the relevant API calls.

Solution: You can utilize the "local storage type" topic, which is entirely consistent with the open-source version.

My idea is that alibaba cloud shouldn't make such a ridiculous implementation here, and shouldn't just modify the kversions.Versions to avoid this behavior, idk other cloud provider "implement" such as alibaba, it doesn't make sense.

Max-Cheng commented 2 months ago

Why using another flag to avoid this behavior: because they claim customer can spefic version to install, thus this won't a version issue

image
twmb commented 1 month ago

Sorry for the delayed reply, I meant to post this much sooner.

What I mean is, rather than introducing a whole new config option to disable OffsetForLeaderEpoch, why don't you do something like this?

func main() {
    v := kversion.Stable()
    v.SetMaxKeyVersion(kmsg.OffsetForLeaderEpoch.Int16(), -1)
    cl, err := kgo.NewClient(
        kgo.MaxVersions(v),

This will opt out of the offset for leader epoch key (I think)

Max-Cheng commented 1 month ago

Sorry for the delayed reply, I meant to post this much sooner.

What I mean is, rather than introducing a whole new config option to disable OffsetForLeaderEpoch, why don't you do something like this?

func main() {
  v := kversion.Stable()
  v.SetMaxKeyVersion(kmsg.OffsetForLeaderEpoch.Int16(), -1)
  cl, err := kgo.NewClient(
      kgo.MaxVersions(v),

This will opt out of the offset for leader epoch key (I think)

Yep. I think this option will help correct the wrong behavior. But after rethinking, what if those Cloud-Providers implement a "fake" Kafka protocol based on their claimed specific version? Rolling back the version may not be a good idea.

But Thanks you reply.

twmb commented 1 month ago

what if those Cloud-Providers implement a "fake" Kafka protocol based on their claimed specific version

They can't -- request versions are negotiated purely on numbers, not names. If a cloud provider starts using different numbers for different requests, it will completely break all clients that negotiate api versions.

In this case, OffsetForLeaderEpoch is request 23, which has version ranges 0 through 4. When the client talks to the broker, the broker returns a response saying: "ApiKey 23, MinVersion 0, MaxVersion 4" (or some other max version). By using kversions.SetMaxKeyVersion(kmsg.OffsetForLeaderEpoch.Int16(), -1), you are telling the client itself to disable support for key 23.