streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
452 stars 137 forks source link

[BUG] Lookup can not find broker address in k8s #633

Open wangjialing218 opened 3 years ago

wangjialing218 commented 3 years ago

Describe the bug Deploy broker with kop in k8s, expose ports with nodeport. when advertisedAddress is not same with the first address in advertisedListeners, lookup can not find broker address in k8s

To Reproduce I use nodeip:nodeport to expose the address of brokers in k8s. Here is the service stats of broker. kop

configuration of broker.conf: advertisedAddress: {worknode_ip}:30873 advertisedListeners: internal:{headless service domain}:6650,external:pulsar://{worknode_ip}:31964 kafkaListeners=PLAINTEXT://0.0.0.0:9092 kafkaAdvertisedListeners=PLAINTEXT://{worknode_ip}:31553

Pulsar client can connect to pulsar outside k8s with pulsar://{worknode_ip}:31964 by setting listenerName with external But when use kafka client outside k8s connect to pulsar with {worknode_ip}:31553, it will fail to get topic metadata due to broker lookup failed.

Additional context Current lookup logic of kop:

  1. use pulsarService.getClient().getLookup().getBroker() to find the broker of topic, result is InetSocketAddress
  2. list all broker address in zk under /loadbalance/brokers/
  3. find match brokers in step 2 which host name is same with the InetSocketAddress in step 1
  4. find the broker which pulsarServiceUrl and port or WebServiceUrl and port is same with InetSocketAddress
  5. return kafkaAdvertisedListeners of that broker in step 4

In my condition, in step 1 the lookup result is the first address in advertisedListeners. which is {headless service domain}:6650, and in step 2 the broker address under /loadbalance/brokers/ is {worknode_ip}:30873. because the broker address here will be used in admin request redirection, I need to set the address here with nodeport in order to handle admin request outside. Then because {headless service domain} is not same with {worknode_ip}, step 3 will fail with no match broker. kop2

Current workaround: In step 1, I create a pulsar client for topic lookup with listenerName extenrnal instead of use the pulsar client of pulsarService.getClient(), so the result could be {worknode_ip}:31964

Suggestion: I'm not sure why kop use such lookup logic. That may have problem when advertisedAddress is not same with the first address in advertisedListeners, which is default lookup result of pulsarService.getClient().getLookup().getBroker(). Can we use the lookup logic of advertisedListeners in pulsar to handle the topic lookup in kop?

  1. deside the listenerName for kop, for example kafka-listener.
  2. set advertisedListeners with kafka-listener: pulsar-internal:{headless service domain}:6650,pulsar-external:pulsar://{worknode_ip}:31964,kafka-listener:pulsar://{worknode_ip}:31553
  3. create pulsar client for topic lookup with listenerName kafka-listener,we could get the expect lookup result {worknode_ip}:31553 in one step.
BewareMyPower commented 3 years ago

It's a legacy design to implement the basic functions quickly. It's right that the current lookup design can be optimized to reuse broker's logic. We also found other problems introduced by using the builtin PulsarClient for topic lookup recently. For example, if many topics/partitions need to be looked up, there would be many lookup requests in the same PulsarClient and TooManyRequestsException might be thrown.

Would you like to contribute a PR to fix it?

wangjialing218 commented 3 years ago

Current advertisedListeners has a validation that address must start with pulsar://, so we can not regist kafka-listener start with PLAINTEXT://

Could we consider a total solution with #574? We could regist multi kafka-listeners into advertisedListeners with different listenerName and port, but may be we need to make some change in broker side to remove the validation of pulsar://.

BewareMyPower commented 3 years ago

Current advertisedListeners has a validation that address must start with pulsar://

Which part of code do you mean? Do you mean in Pulsar side, what the getProtocolDataToAdvertise() returns cannot be registered?

Currently the work for #574 hasn't started, it may take some time to look into this issue.

wangjialing218 commented 3 years ago

Current advertisedListeners has a validation that address must start with pulsar://

Which part of code do you mean? Do you mean in Pulsar side, what the getProtocolDataToAdvertise() returns cannot be registered?

In Pulsar side there is a Class MultipleListenerValidator, Currently we can only regist advertisedListeners start with pulsar://, otherwise the validator will throw exception. If we want to reuse the advertisedListeners in broker for kop topic lookup, we need to regist kafkaAdvertisedListeners there.

BewareMyPower commented 3 years ago

I'm afraid it's not easy to reuse the advertisedListeners of Pulsar.

Regarding to

But when use kafka client outside k8s connect to pulsar with {worknode_ip}:31553, it will fail to get topic metadata due to broker lookup failed.

could you provide some help since I remember you have tried the NodePort mode before? @gaoran10

gaoran10 commented 3 years ago

The configurations advertisedAddress and advertisedListeners couldn't work together.

Maybe we could use these configurations? Using the configuration advertisedAddress is enough?

advertisedAddress: {headless service domain}:6650
kafkaListeners=PLAINTEXT://0.0.0.0:9092
kafkaAdvertisedListeners=PLAINTEXT://{headless service domain}:9092

The lookup result is {headless service domain}:9092, we should specify the host for headless service domain, the IP is the external IP.

wangjialing218 commented 3 years ago

Client outside k8s can not connect to kop through the lookup result {headless service domain}:9092. I'm working on the way to make client connect to broker in k8s without any proxy module and any hosts configration, that could be achieved in follow way:

  1. lookup result for pulsar client outside is {work node external ip}:{nodeport of 6650}
  2. redirect url for pulsar http request outside is {work node external ip}:{nodeport of 8080}
  3. lookup result for kafka client outside is {work node external ip}:{nodeport of 9092}

I have made 1 and 2 in my k8s, for 3 I have workaround way to make it possible currently. I think #644 is a good solution to solve this problem.