confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
318 stars 3.16k forks source link

Librdkafka handling of brokers with the same host:port is inconsistent with java #4212

Open dpcollins-google opened 1 year ago

dpcollins-google commented 1 year ago

Description

When a Metadata request returns brokers with different IDs but the same host:port, librdkafka will rebind the state associated with the node ID with the same internal state preventing communicating with any IDs but the last in the list, while java will handle this case without issue.

How to reproduce

This is an internal project and hard to reproduce from base principles, but a unit test could easily be constructed.

I'm developing an L7 proxy which terminates TCP streams for the kafka wire protocol at a horizontally-load-balanced server, then re-exports those streams to the underlying cluster after aggregation. When presenting multiple nodes (to avoid saturating any individual stream) to the MetadataResponse that all have the same URL (of the generic TLS proxy) but different node ids, librdkafka thrashes connections and is unable to get into a stable state.

This is due to this check in librdkafka where, if it sees a node id it does not currently have state for, will forcibly update the node id for a broker with the same host:port https://github.com/confluentinc/librdkafka/blob/c75eae84846b1023422b75798c41d4b6b1f8b0b7/src/rdkafka_broker.c#L5297

This check does not exist in java, which just replaces the nodeId -> host:port mapping when it gets a new one https://github.com/apache/kafka/blob/27548707dd8f2324d27335edc922ae88d78d86ed/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java#LL136C22-L136C22

It looks like this check dates back to when Metadata request support was first added in https://github.com/confluentinc/librdkafka/commit/b09ff60c9e3e93815b3491f007460291858b3caf . @edenhill if you're still involved in the project.

Can we remove this inconsistency with Java and with the kafka protocol?

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

emasab commented 1 year ago

Thanks @dpcollins-google! @edenhill left the company and transferred ownership of the repository to Confluent, that is maintaining it.

I'm checking this.

emasab commented 1 year ago

The java code your're linking isn't about nodeId to Node mapping, but refers to KIP-516 that introduces topic ids (still not implemented in librdkafka), instead the place where the nodeId to Node mapping is created is here.

What you want to do is having a single host:port for all the brokers. Usually a load balancer is created for each broker. How do you plan to find the right broker to connect to in the load balancer?

dpcollins-google commented 1 year ago

The java code ... isnt the nodeid to node mapping

I think it is? The mapping is just overridden, but this is the codepath called by https://github.com/apache/kafka/blob/27548707dd8f2324d27335edc922ae88d78d86ed/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L278 when the metadata for the client gets updated. What you've linked is where that map is constructed from the on-the-wire representation, that is then passed into the method I linked.

having a single host:port for all the brokers. Usually a load balancer is created for each broker. How do you plan to find the right broker to connect to in the load balancer?

This is true with protocol agnostic load balancers that forward TCP traffic, but I'm actually terminating the kafka wire protocol at the load balancer, with the goal to apply transforms and routing based on the data in the protocol messages.

emasab commented 1 year ago

In that code the newNodes parameter is not touched and passed to the MetadataCache constructor, instead you have a map from topic name to topic id. It's about KIP-516. Indeed from the other link it appears you can have a Map with different nodeIds as keys and Nodes with the same host:port.

I had the impression that you wanted to find the node_id from the protocol, but it seems it's not passed, not even in the header.

dpcollins-google commented 1 year ago

I had the impression that you wanted to find the node_id from the protocol, but it seems it's not passed, not even in the header.

No, we're abstracting away the actual cluster entirely and routing to the actual cluster based on the topic provided in the request.

emasab commented 1 year ago

The first request that is done is ApiVersions, there is no topic there. You could achieve the same by putting the broker id in the hostname and running a SNI proxy without reading the TLS stream, the SNI extension is supported in librdkafka see rd_kafka_transport_ssl_set_endpoint_id

dpcollins-google commented 1 year ago

Effectively, per the kafka protocol, the hostname should never be used for matching, only the node ID. Effectively, the change in https://github.com/confluentinc/librdkafka/commit/b09ff60c9e3e93815b3491f007460291858b3caf was intended as an optimization to "migrates bootstrap brokers to proper brokers if they can be exactly matched by name.", i.e. reuse the channels, but in doing so breaks with cases that two brokers with different node ids have the same host:port value.

rmb938 commented 1 year ago

@dpcollins-google I reported a similar issue here https://github.com/confluentinc/librdkafka/issues/3784

Wondering if you ever figured out a solution in the proxy you are building, I am working on something similar.

dpcollins-google commented 1 year ago

The proxy in question was for Google cloud Pub/Sub Lite, a similar partition based streaming system but without specific broker identities. I maintain that librdkafka is not following the protocol spec here, which intentionally separates the concept of broker identities from host:port.

The workaround we deployed was to prefix the DNS of the broker with it's node id, i.e. node id "12345" has DNS similar to "broker-12345.us-central1-pubsub-kafka.googleapis.com", even though that resolved to the same load balancer address regardless of what the id portion is. This forces librdkafka to treat each node id as different.

rmb938 commented 1 year ago

Gotcha, that makes sense. I also agree that librdkafka is not following protocol compared for every other client.