confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
293 stars 3.15k forks source link

Invalid version for API key FETCH: 12 #4870

Closed ffissore closed 1 month ago

ffissore commented 1 month ago

Description

When consuming from a broker at version 2.1.1, the latest librdkafka 2.6.0 fails to connect. It logs this (with debug: protocol):

%7|1728892426.107|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent MetadataRequest (v7, 30 bytes @ 0, CorrId 6)
%7|1728892426.112|RECV|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Received MetadataResponse (v7, 3384 bytes, CorrId 6, rtt 4.87ms)
%7|1728892426.114|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent MetadataRequest (v7, 74 bytes @ 0, CorrId 7)
%7|1728892426.116|RECV|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Received MetadataResponse (v7, 298 bytes, CorrId 7, rtt 1.38ms)
%7|1728892426.116|SEND|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Sent JoinGroupRequest (v3, 242 bytes @ 0, CorrId 4)
%7|1728892429.121|RECV|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Received JoinGroupResponse (v3, 239 bytes, CorrId 4, rtt 3004.75ms)
%7|1728892429.121|SEND|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Sent MetadataRequest (v7, 74 bytes @ 0, CorrId 5)
%7|1728892429.123|RECV|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Received MetadataResponse (v7, 298 bytes, CorrId 5, rtt 1.46ms)
%7|1728892429.123|SEND|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Sent SyncGroupRequest (v2, 245 bytes @ 0, CorrId 6)
%7|1728892429.126|RECV|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Received SyncGroupResponse (v2, 80 bytes, CorrId 6, rtt 3.20ms)
%7|1728892429.127|SEND|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Sent HeartbeatRequest (v2, 117 bytes @ 0, CorrId 7)
%7|1728892429.128|CONNECTED|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Connected (#1)
%7|1728892429.128|FEATURE|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1728892429.128|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent ApiVersionRequest (v3, 70 bytes @ 0, CorrId 1)
%7|1728892429.128|RECV|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Received HeartbeatResponse (v2, 6 bytes, CorrId 7, rtt 1.71ms)
%7|1728892429.129|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received ApiVersionResponse (v3, 6 bytes, CorrId 1, rtt 1.36ms)
%7|1728892429.129|APIVERSION|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
%7|1728892429.129|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 2)
%7|1728892429.130|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received ApiVersionResponse (v0, 264 bytes, CorrId 2, rtt 0.77ms)
%7|1728892429.130|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent MetadataRequest (v7, 30 bytes @ 0, CorrId 3)
%7|1728892429.130|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 4)
%7|1728892429.131|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 5)
%7|1728892429.131|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received MetadataResponse (v7, 97 bytes, CorrId 3, rtt 1.08ms)
%7|1728892429.133|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received ListOffsetsResponse (v4, 82 bytes, CorrId 4, rtt 2.18ms)
%7|1728892429.134|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received ListOffsetsResponse (v4, 82 bytes, CorrId 5, rtt 2.95ms)
%7|1728892429.135|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 8)
%7|1728892429.135|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 9)
%7|1728892429.136|RECV|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Received ListOffsetsResponse (v4, 82 bytes, CorrId 8, rtt 1.45ms)
%7|1728892429.137|RECV|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Received ListOffsetsResponse (v4, 82 bytes, CorrId 9, rtt 2.32ms)
%7|1728892429.139|CONNECTED|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Connected (#1)
%7|1728892429.139|FEATURE|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1728892429.139|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent ApiVersionRequest (v3, 70 bytes @ 0, CorrId 1)
%7|1728892429.141|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received ApiVersionResponse (v3, 6 bytes, CorrId 1, rtt 1.49ms)
%7|1728892429.141|APIVERSION|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
%7|1728892429.141|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 2)
%7|1728892429.142|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received ApiVersionResponse (v0, 264 bytes, CorrId 2, rtt 0.81ms)
%7|1728892429.142|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent MetadataRequest (v7, 30 bytes @ 0, CorrId 3)
%7|1728892429.142|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 4)
%7|1728892429.142|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 5)
%7|1728892429.143|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received MetadataResponse (v7, 97 bytes, CorrId 3, rtt 1.02ms)
%7|1728892429.144|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received ListOffsetsResponse (v4, 82 bytes, CorrId 4, rtt 2.08ms)
%7|1728892429.145|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received ListOffsetsResponse (v4, 82 bytes, CorrId 5, rtt 2.72ms)
%7|1728892429.146|SEND|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Sent OffsetFetchRequest (v5, 127 bytes @ 0, CorrId 8)
%7|1728892429.148|RECV|a-client-id#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/3: Received OffsetFetchResponse (v5, 118 bytes, CorrId 8, rtt 1.22ms)
%7|1728892429.249|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 6)
%7|1728892429.249|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 10)
%7|1728892429.249|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent ListOffsetsRequest (v4, 98 bytes @ 0, CorrId 6)
%7|1728892429.250|RECV|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Received ListOffsetsResponse (v4, 82 bytes, CorrId 6, rtt 1.51ms)
%7|1728892429.250|RECV|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Received ListOffsetsResponse (v4, 82 bytes, CorrId 10, rtt 1.53ms)
%7|1728892429.250|RECV|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Received ListOffsetsResponse (v4, 82 bytes, CorrId 6, rtt 1.53ms)
%7|1728892429.251|SEND|a-client-id#consumer-1| [thrd:kafka-1:19091/bootstrap]: kafka-1:19091/1: Sent FetchRequest (v12, 133 bytes @ 0, CorrId 11)
%7|1728892429.251|SEND|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Sent FetchRequest (v12, 133 bytes @ 0, CorrId 7)
%7|1728892429.251|SEND|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Sent FetchRequest (v12, 133 bytes @ 0, CorrId 7)
%6|1728892429.252|FAIL|a-client-id#consumer-1| [thrd:kafka-3:19093/bootstrap]: kafka-3:19093/3: Disconnected: verify that security.protocol is correctly configured, broker might require SASL authentication (after 122ms in state UP)
%6|1728892429.253|FAIL|a-client-id#consumer-1| [thrd:kafka-2:19092/bootstrap]: kafka-2:19092/2: Disconnected: verify that security.protocol is correctly configured, broker might require SASL authentication (after 110ms in state UP)

This happens both with and without using our internal CA and self-signed certificates.

The broker logs this:

kafka-1-1          | [2024-10-14 07:55:40,586] ERROR Exception while processing request from 172.19.0.4:19091-172.19.0.1:52532-24 (kafka.network.Processor)
kafka-1-1          | org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: FETCH, apiVersion: 12, connectionId: 172.19.0.4:19091-172.19.0.1:52532-24, listenerName: ListenerName(PLAINTEXT), principal: User:ANONYMOUS
kafka-1-1          | Caused by: java.lang.IllegalArgumentException: Invalid version for API key FETCH: 12
kafka-1-1          |    at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:312)
kafka-1-1          |    at org.apache.kafka.common.protocol.ApiKeys.requestSchema(ApiKeys.java:282)
kafka-1-1          |    at org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:290)
kafka-1-1          |    at org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:63)
kafka-1-1          |    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:89)
kafka-1-1          |    at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:706)
kafka-1-1          |    at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:699)
kafka-1-1          |    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
kafka-1-1          |    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
kafka-1-1          |    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
kafka-1-1          |    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
kafka-1-1          |    at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
kafka-1-1          |    at kafka.network.Processor.run(SocketServer.scala:595)
kafka-1-1          |    at java.lang.Thread.run(Thread.java:748)

How to reproduce

Downgrade to 2.5.3 and all will go smoothly.

Checklist

Please provide the following information:

Koenkk commented 1 month ago

Got exactly the same issue (after upgrading confluent-kafka from 2.5.3 to 2.6.0 which uses librdkafka 2.6.0)

emasab commented 1 month ago

There's a problem with versions of Apache Kafka < 2.7 because they haven't got Fetch RPC version 12, used for fallback. The problem is with this fallback:

        /* Fallback to version 12 if topic id is null which can happen if
         * inter.broker.protocol.version is < 2.8 */
        ApiVersion =
            ApiVersion > 12 && can_use_topic_ids(rkb) ? ApiVersion : 12;

it should be

if (ApiVersion > 12 && !can_use_topic_ids(rkb))
        ApiVersion = 12;