kubernetes-retired / contrib

[EOL] This is a place for various components in the Kubernetes ecosystem that aren't part of the Kubernetes core.
Apache License 2.0
2.46k stars 1.68k forks source link

Kafka Statefulset external access issue #2891

Closed LauraMoraB closed 6 years ago

LauraMoraB commented 6 years ago

Hello, I have deployed Kafka and Zookeeper in a Kubernetes cluster using Statefulsets with three replicas in a Softlayer machine, using this configuration (deployment and Image), but changing the kafka version to 1.0. Now, I am having problems to produce/consume messages from outside the Kubernetes cluster, but everything works fine when I execute producers/consumers within the cluster. I have seen some issues regarding this problem, but neither resolved it for me.

I have this set up:

NAME                    READY     STATUS    RESTARTS  
po/kafka-0                   1/1       Running   0      
po/kafka-1                   1/1       Running   0         
po/kafka-2                   1/1       Running   0          
po/zk-0                        1/1       Running   0         
po/zk-1                        1/1       Running   0        
po/zk-2                        1/1       Running   0          
NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      
svc/kafka-hs      ClusterIP        None             <none>        9092/TCP                    
svc/kafka-svc    NodePort     10.97.218.58     <none>        9093:30182/TCP              
svc/zk-hs          ClusterIP         None             <none>        2888/TCP,3888/TCP            
svc/zk-svc         ClusterIP   10.111.37.187    <none>        2181/TCP                     

I have two services:

apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  namespace: cdc1
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: server
  clusterIP: None
  selector:
    app: kafka        
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  namespace: cdc1
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    name: external
    nodePort: 30182
  selector:
    app: kafka

I have set this env variables for Kafka:

 - name: KAFKA_ADVERTISED_LISTENERS
   value: "INTERNAL_PLAINTEXT://kafka-hs:9092,EXTERNAL_PLAINTEXT://MACHINE-IP:30182"
 - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
   value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
 - name: KAFKA_LISTENERS
   value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:30182"
 - name: KAFKA_INTER_BROKER_LISTENER_NAME
   value: "INTERNAL_PLAINTEXT"

And when I try to produce message from outside the cluster I get the following error: 2018-04-27 13:27:01 WARN NetworkClient:241 - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.

I set the bootstrap-server configuration in the producer to machine-IP:30182

Any help will be really appreciated! Thanks!

juv commented 6 years ago

So I guess you have multiple brokers that you expect to be exposed externally? If yes, you will need the following:

  1. a headless service for inter-broker communication within your Kafka cluster
  2. a service that exposes an external ip for each pod you have -- have a look here

Make sure that your brokers expose the correct advertised ip to ZooKeeper, for example with zookeeper-shell.sh your-zookeeper-svc-name.whatever:2181 get /kafka/brokers/ids/0

This is also relevant to understand how the client bootstrapping process works.

LauraMoraB commented 6 years ago

Yes, I was missing a service exposed for each broker. After adding each service and some configuration I could access the cluster externally.

I used port 9093 for each service and then the headless service at port 9092

In order to work correctly, I had to change some specific configurations:

  1. In the services configuration I had to add the selector:
    selector:
    statefulset.kubernetes.io/pod-name: <pod-name>
  2. In the Kafka configuration I added these environment variables:
    listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
    advertised.listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093
    listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093
    inter.broker.listener.name=INTERNAL_PLAINTEXT

    Thanks for the information!

ralucas commented 6 years ago

@LauraMoraB Can you share how you were able to accomplish external access in more depth? I have that headless service, utilizing the same listener settings that you have here and have opened up nodeports as an additional service, but am unable to reach the NodePorts or there IPs. What additional did you end up doing? Thanks.

LauraMoraB commented 6 years ago

Hi @ralucas , I'm sorry I took so long.

I configured a Nodeport service for each Broker like this:

apiVersion: v1
kind: Service
metadata:
  name: <service-name>
spec:
  ports:
  - port: 9093
    targetPort: 9092
    name: kafka-port
    protocol: TCP
  selector:
    statefulset.kubernetes.io/pod-name: <pod-name>
  type: NodePort

As an image I used: https://github.com/kubernetes/contrib/tree/master/statefulsets/kafka, but I changed the OS to alpine and the kafka version to 1.0.0 (If this helps provide more info)

And the Statefulset Configuration:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-hs
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
    spec:
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        image: <image>
        ports:
        - containerPort: 9093
        - containerPort: 9092
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /etc/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT \
          --override advertised.listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093 \
          --override listeners=INTERNAL_PLAINTEXT://:9092,EXTERNAL_PLAINTEXT://:9093 \
          --override inter.broker.listener.name=INTERNAL_PLAINTEXT \
        ...
NAME          READY     STATUS    RESTARTS   AGE
pod/kafka-0   1/1       Running   0          4d
pod/kafka-1   1/1       Running   0          4d
pod/kafka-2   1/1       Running   0          4d
pod/zk-0      1/1       Running   0          4d
pod/zk-1      1/1       Running   0          4d
pod/zk-2      1/1       Running   0          4d

NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
service/broker-0   NodePort    <ip>      <none>        9093:30001/TCP      4d
service/broker-1   NodePort    <ip>     <none>        9093:30002/TCP      4d
service/broker-2   NodePort    <ip>     <none>        9093:30003/TCP      4d
service/kafka-hs   ClusterIP   None             <none>        9092/TCP            4d
service/zk-hs      ClusterIP   None             <none>        2888/TCP,3888/TCP   4d
service/zk-svc     ClusterIP   <ip>   <none>        2181/TCP            4d

This is all my configuration. If it still doesn't work for you, share the error it gives you when you try to reach the Broker and I will try to be provide more help.

lalan7 commented 5 years ago

@LauraMoraB , in the previous example , i don't see any external IP to acces kafka broker from external ip. In that case what should be the best way to achieve this. thx

LauraMoraB commented 5 years ago

Hello @lalan7 ,

The exposed service is a NoderPort Service. This means it does not require to set an external IP, you just need to use the IP from your machine with the nodeport port. For example, to access the first broker: broker-0, you should provide: <YourMachineIP>:30001

There is a really nice explanation on services in the Kubernetes official documentation page, see: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types

ravisjoshi commented 5 years ago

@LauraMoraB Can you share your zookeeper config too? For me, when I try to use producer from outside cluster, it says, it can't connect to my even though, telnet to nodeIP & nodePort works fine.

I use a docker container outside kafka-cluster as follows: ./kafka-console-producer.sh --broker-list nodeIP:nodePort --topic mytopic

and get error similar to following one: [2018-11-07 10:57:00,374] WARN [Producer clientId=console-producer] Error connecting to node kafka-2.kafka-svc.kafka-ns.svc.cluster.local:9092 (id: 2 rack: null) (org.apache.kafka.clients.NetworkClient) java.io.IOException: Can't resolve address: kafka-2.kafka-svc.kafka-ns.cluster.local:9092 at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) at org.apache.kafka.common.network.Selector.connect(Selector.java:214) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)

and if you have the whole Kafka & zookeeper config somewhere in github, please provide that details too :)

Thank you.

jagan23527001 commented 5 years ago

On port the Kafka server was started ? 9093 or 9092 ? if possible can you share your YAML files or upload it in Github . Thanks

jagan23527001 commented 5 years ago

@LauraMoraB On port the Kafka server was started ? 9093 or 9092 ? if possible can you share your YAML files or upload it in Github . Thanks

immusk commented 5 years ago

@LauraMoraB I have used the same configuration that you suggested, but when I tried to write to a topic from outside of kubenetes cluster, I am getting the following error.

./kafka-console-producer.sh --broker-list xxxxxx:30524 --topic spr testing [2018-12-21 06:49:08,976] ERROR Error when sending message to topic spr with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

can you please help here.

waniakshay commented 5 years ago

@LauraMoraB I have used the same configuration that you suggested, but when I tried to write to a topic from outside of kubenetes cluster, I am getting the following error.

./kafka-console-producer.sh --broker-list xxxxxx:30524 --topic spr testing [2018-12-21 06:49:08,976] ERROR Error when sending message to topic spr with key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

can you please help here.

i am also facing the same issue. Interbroker communication and producer consumer both work fine from within the cluster. but when starting producer from outside the cluster, TCP connection is not established with kafka.

songhui1984 commented 5 years ago

now i used the nodeport but get some tips as belows, it make me confuse, i suppose the nodeport should directly transfer any traffic to the pod host on the node ? but as show it can do load balance,

WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 1 (/10.244.2.117:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:44,344] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 2 (/10.244.0.79:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:47,350] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 1 (/10.244.2.117:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-04-11 08:04:50,356] WARN [Consumer clientId=consumer-1, groupId=console-consumer-84014] Connection to node 0 (/10.244.1.151:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient