Open Pavelsky89 opened 6 days ago
Thanks for reporting the issue. We'll investigate soon. #1550 should address the immediate issue but we'll want to understand why this is a breaking change.
I just tried to start up a pure kafka broker v3.8.1 without kroxy in between, and the producer and consumer with v3.9.0 can successfully write/read from the v3.8.1 brokers. So, Kafka should have done the api version negotiation correctly.
https://issues.apache.org/jira/browse/KAFKA-17011 (part of Kafka 3.9.0) introduced the new version of the ApiVersion (v4). Kroxylicious unconditionally decodes ApiVersion request and response so it must reserialise them before forwarding.. The codec used by Kroxylicious doesn't know how to serialise version 4 response.
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion#addSize
if (_version > 3) {
throw new UnsupportedVersionException("Can't size version " + _version + " of ApiVersion");
} else {
In @Pavelsky89 case
We aim for Kroxylicious uphold the bidirectional compatibility pledge made by Kafka itself so what @Pavelsky89 is doing ought to work.
I think the issue here is the handling of the v4 request. The proxy is in effect lying when it forwards the v4 request. It is actually v3 request with just with the version number set badly.
I think it would be preferable if the proxy clamped the version number of the request to the highest it supports. If we do that the broker will see a v3 request and return a v3 response, which will reserialise okay. The Apache Kafka client seems to permit the arrival of the v3 response in response to its v4 request okay, however this approach would technically break the spec. I didn't check the other common Kafka clients.
https://kafka.apache.org/protocol.html#protocol_compatibility
The server will reject requests with a version it does not support, and will always respond to the client with exactly the protocol format it expects based on the version it included in its request.
The proxy could change the version of the ApiVersion response back match the client's request, however this might mean that the client see unexpectedly null fields (in the case where the new version added more fields). This might be worse.
WDYAT?
@tombentley @SamBarker @showuon
https://issues.apache.org/jira/browse/KAFKA-17011 (3.9.0) introduced the new version of the ApiVersion (v4).
does that mean 3.9.0 is simply not supported by 0.9.0 proxy ?
Sorry you replied to a half written comment. Yes, it is a defect. We should have a fix soon.
I think it would be preferable if the proxy clamped the version number of the request to the highest it supports.
I thought we already did, I'm sure we've talked about this problem multiple times... So yes we should cap the API version at what the proxy supports.
If we do that the broker will see a v3 request and return a v3 response, which will reserialise okay. The Apache Kafka client seems to permit the arrival of the v3 response in response to its v4 request okay, however this approach would technically break the spec
Why does that break the spec? Isn't that the only way a 3.8.1 broker can respond to a v4 request?
I think it would be preferable if the proxy clamped the version number of the request to the highest it supports.
I thought we already did, I'm sure we've talked about this problem multiple times... So yes we should cap the API version at what the proxy supports.
The ApiVersion RPC is the mechanism that performs for the negotiation so client/broker exchange the other RPCs at mutually agreed version. However, in this case we are talking about the ApiVersion RPC itself.
If we do that the broker will see a v3 request and return a v3 response, which will reserialise okay. The Apache Kafka client seems to permit the arrival of the v3 response in response to its v4 request okay, however this approach would technically break the spec
Why does that break the spec?
The spec says "[the server] will always respond to the client with exactly the protocol format it expects based on the version it included in its request."
I take this sentence to mean that the version of the response MUST match the version of the request.
Isn't that the only way a 3.8.1 broker can respond to a v4 request?
If I've understood the broker's behaviour right, no. It uses the version of the ApiVersion request to determine the version of the ApiVersion response. Send a v4 ApiVersion request to 3.8.0 Broker, you get a response claiming to be v4, even though 3.8.0 knows nothing about v4. It is not an error. I think this behaviour is actually key to the bidirectionality in the compatibility statement.
Note to self: need to understand why the pre-3.9 Brokers don't fail when constructing the ApiVersions v4 response after the receipt of a v4 request. The Broker must be using a different code path to construct this response than the Proxy does. Perhaps the best fix is to make sure the Proxy mimics what the Broker does.
Here's the difference, the Kafka Broker does this whilst serialising the unknown ApiVerions response.
public short apiVersion() {
// Use v0 when serializing an unhandled ApiVersion response
if (isUnsupportedApiVersionsRequest())
return 0;
return header.apiVersion();
}
This is part of KIP-511 in action. In the case where the Broker doesn't support the client's api version, the Broker returns an ApiVersion version response at version 0 containing only the ApiVersion api key with the version range supported by the Broker. The client then another ApiVersion
request at a mutually supported version.
There are in fact two defects in Kroxylicious here:
1) Client is using a newer ApiVersions than the Broker supports. In this case the issue is that Kroxylicious is using the api version of the request to serialise the response. This is wrong in the downgrade/KIP-511 case.
2) There will be a second defect when only the Kroxylicious is behind. That is both client and broker support the new ApiVersions versions but the Kroxylicious is running with an older codec. In this case, I think Kroxylicious should probably return is own response to the client, indicating its supported ApiVersion RPC range. This gives us the possibility that the client may see two downgrades (one from the proxy then a second one from the broker). It looks like the Java client would tolerate this. I haven't checked the other clients.
This gives us the possibility that the client may see two downgrades (one from the proxy then a second one from the broker). It looks like the Java client would tolerate this. I haven't checked the other clients.
Maybe we could do it in one downgrade. If the proxy is aware of v3, and client and broker support v4.
edit: have put up a PoC, a bit awkward to implement.
I also tried doing it purely in a filter using sendRequest
since that is able to control the outgoing api version (I don't think we can change apiVersion for a typical forwarded message). But something blows up in the statemachine when we try to shortcircuit respond Can't size version 4 of ApiVersion. Increase log level to DEBUG for stacktrace
. I assume it's remembered the apiVersion of the request and using that version for the short circuit response.
The message decoder is unable to do the correlation since it's only handling decode, so there's a dirty hack to instruct another Filter to handle the correlation and error setting.
This gives us the possibility that the client may see two downgrades (one from the proxy then a second one from the broker). It looks like the Java client would tolerate this. I haven't checked the other clients.
Maybe we could do it in one downgrade.
Yes agreed, I'd been thinking about it this evening and came to the same conclusion conclusion.
If the proxy is aware of v3, and client and broker support v4.
client sends proxy v4
proxy detects it can't handle the version and forwards it's own v0 ApiVersions request to the broker
Rather than v0, it should use the maximum version the proxy can support.
- proxy intercepts v0 response, intersects in it's own supported versions and sets ErrorCode=UNSUPPORTED_VERSION
Agreed. It might have the effect of further reducing the version range.
- client picks the new version and starts again (at this point it knows the highest ApiVersions version supported by proxy and broker)
edit: have put up a PoC, a bit awkward to implement.
I also tried doing it purely in a filter using
sendRequest
since that is able to control the outgoing api version (I don't think we can change apiVersion for a typical forwarded message). But something blows up in the statemachine when we try to shortcircuit respondCan't size version 4 of ApiVersion. Increase log level to DEBUG for stacktrace
. I assume it's remembered the apiVersion of the request and using that version for the short circuit response.
I understand that issue. It is because the proxy uses the request api version to serialise the response. My PR should fix that.
The message decoder is unable to do the correlation since it's only handling decode, so there's a dirty hack to instruct another Filter to handle the correlation and error setting.
I had a play with another evil idea which was to forward an Opaque frame containing a hand-rolled ApiVersions request with the version set to short max. Idea being we pretend it's a client using a future version that should never come to be and the broker would respond with the error code and version 0 response, proxy could intersect without having to correlate the response.
But from my read of the code this would be fragile as the Kafka code is inferring the Header version from the APIKey+version. In ApiMessageType it does:
public short requestHeaderVersion(short _version) {
switch (apiKey) {
...
case 18: // ApiVersions
if (_version >= 3) {
return (short) 2;
} else {
return (short) 1;
}
so in future if/when further Header Versions were added, faking a large version could mean the broker thinks the message is using the latest header version but the proxy cannot account for that.
TL:DR; pretending the proxy is ahead of the broker to prompt the error reaction is a bad idea. It was just tempting because we don't need to correlate anything in the Proxy.
Rather than v0, it should use the maximum version the proxy can support.
why forward on the highest proxy supported version? I guess we can pass more information to the broker that way, like the clientId in the header and client software fields in the value. Better for debugging.
We still have to correlate the response and ensure we return the error code to let the client know it was using an unsupported version, the error might be set by the broker or the proxy depending if the proxy is ahead of the broker.
Rather than v0, it should use the maximum version the proxy can support.
why forward on the highest proxy supported version? I guess we can pass more information to the broker that way, like the clientId in the header and client software fields in the value. Better for debugging.
My rationale if that sending the highest is least surprise. It is what the broker expects its peer to do.
We still have to correlate the response and ensure we return the error code to let the client know it was using an unsupported version, the error might be set by the broker or the proxy depending if the proxy is ahead of the broker.
Agree on both those points.
@robobario I didn't get chance to continue with this today, I'll follow up on Monday.
Describe the bug When using java kafka-clients (https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/3.9.0) producer in version 3.9.0 proxy is unable to handle requests. We're getting following errors
To Reproduce Steps to reproduce the behavior:
Expected behavior Records are passed to target kafka cluster
Additional info