confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
7.37k stars 3.11k forks source link

ListOffsetsRequest should only be sent to the leader replica #4616

Open kphelps opened 4 months ago

kphelps commented 4 months ago

When using fetch-from-follower, it is currently possible for a consumer to get stuck in a loop sending ListOffsetRequest when we go through the rd_kafka_offset_reset path since the request is sent to the preferred replica. Instead, always send it to the leader.

cla-assistant[bot] commented 4 months ago

CLA assistant check
All committers have signed the CLA.

emasab commented 4 months ago

It's correct to send the ListOffsets request to the preferred replica. The loop probably comes from this discovered bug: https://github.com/confluentinc/librdkafka/issues/4620

When enabling debug logs, could you check if it's receiving FENCED_LEADER_EPOCH errors?

kphelps commented 4 months ago

Nope, I'm seeing NOT_LEADER_OR_FOLLOWER errors.

kphelps commented 4 months ago

Aha, from KIP-392:

The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

Looks like we unconditionally set the replica id to -1 here

Looks like the Java client opts to just always send to the leader. WDYT?

emasab commented 3 months ago

@kphelps The replica id should be set to -1 in clients, and to the broker id in followers, see the RPC definition https://github.com/apache/kafka/blob/2f401ff4c85f6797391b8a3dd57d651f4de3d6ad/clients/src/main/resources/common/message/ListOffsetsRequest.json#L42

The error NOT_LEADER_OR_FOLLOWERhappens when the broker isn't a replica for that partition. In that case librdkafka refreshes metadata to get the leader again, here. https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/src/rdkafka_request.c#L935

Is it possible to reproduce the issue and send a log with "debug": "all" or "debug": "consumer,cgrp,topic,fetch,metadata,broker,topic" ?

kphelps commented 3 months ago

I'm working to reproduce this now, but have been having trouble in a controlled environment. Will share that when I get it.

The broker only allows fetching from the leader unless the replica id is set to -2 here which propagates down to retrieving the local log and erroring here.

kphelps commented 3 months ago

Found a test that was silently failing due to this issue

emasab commented 3 months ago

Thanks @kphelps I was checking this issue more in depth and understood the problem, it's different from what I linked and as you said could be solved in two ways, by sending the request to the follower with -2 or to the leader as Java is doing.

The con of sending it to leader is that is case the follower is lagging behind it could have other offset resets when fetching, until it has caught up, I've checked broker code and tried using -2 by changing mock cluster implementation and it works too.

Will ask for an opinion internally too before deciding for one of the two solutions.

emasab commented 2 months ago

Cannot fix it by sending the request to the follower because there are some problems: if replica id was different from CONSUMER_REPLICA_ID (-1), the isolation level parameter would be ignored, so I'm following @kphelps proposal and using the same behaviour as Java, to send the request to the leader only.

broker code:

            val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
            val isClientRequest = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID
            val isolationLevelOpt = if (isClientRequest)
              Some(offsetRequest.isolationLevel)
            else
              None
emasab commented 2 months ago

/sem-approve

emasab commented 3 weeks ago

/sem-approve

emasab commented 2 weeks ago

@kphelps sorry, giving we're have having an issue with the public CI, I've created this internal branch with your changes. #4754