aklivity / zilla

🦎 A multi-protocol edge & service proxy. Seamlessly interface web apps, IoT clients, & microservices to Apache Kafka® via declaratively defined, stateless APIs.
https://docs.aklivity.io/zilla
Other
530 stars 48 forks source link

Zilla get blocked when sending messages to kafka #1268

Open InigoGastesi opened 4 days ago

InigoGastesi commented 4 days ago

Describe the bug I am using Zilla in a Kubernetes environment, installed via Helm. My goal is to use Zilla to connect MQTT messages to Kafka. I have Kafka running in the same Kubernetes cluster, but when I send an MQTT message to Zilla, the process gets stuck at the publish.multiple command from the Paho library.

To Reproduce my kafka deployment configuration:

---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-service
  namespace: zilla
spec:
  selector:
    app: kafka-broker
  ports:
    - name: internal
      port: 9092       # Puerto para comunicaciones internas
      targetPort: 9092
    - name: external
      port: 29092      # Puerto para clientes dentro del clúster
      targetPort: 29092
    - name: controller
      port: 9093       # Puerto para el controlador
      targetPort: 9093
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: zilla
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
        - name: kafka-broker
          image: bitnami/kafka:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092  # Puerto para comunicaciones internas
            - containerPort: 29092 # Puerto para conexiones externas (dentro del clúster)
            - containerPort: 9093  # Puerto para controladores
          env:
            # Permitir conexiones no seguras
            - name: ALLOW_PLAINTEXT_LISTENER
              value: "yes"

            # Configuración del broker ID
            - name: KAFKA_CFG_BROKER_ID
              value: "1"

            # Identificación del nodo y quorum para el controlador
            - name: KAFKA_CFG_NODE_ID
              value: "1"

            - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
              value: "1@kafka-broker:9093"

            # Protocolo de seguridad para los listeners
            - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
              value: "INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:PLAINTEXT"

            # Definir los listeners para el controlador
            - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"

            # Directorio de logs
            - name: KAFKA_CFG_LOG_DIRS
              value: "/tmp/logs"

            # Roles de proceso
            - name: KAFKA_CFG_PROCESS_ROLES
              value: "broker,controller"

            # Configuración de listeners internos y externos
            - name: KAFKA_CFG_LISTENERS
              value: "INTERNAL://:9092,CLIENT://:29092,CONTROLLER://:9093"

            # Listener para la comunicación interna entre brokers
            - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
              value: "INTERNAL"

            # Listeners anunciados (usado por clientes y otros brokers)
            - name: KAFKA_CFG_ADVERTISED_LISTENERS
              value: "INTERNAL://kafka-broker:9092,CLIENT://kafka-service:29092"

            # Permitir la creación automática de tópicos
            - name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
              value: "true"

            # Tamaño del buffer de recepción de sockets
            - name: KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES
              value: "204800"

            # Comportamiento de reset de offsets
            - name: KAFKA_CFG_AUTO_OFFSET_RESET
              value: "earliest"

            # Tamaño máximo de una petición
            - name: KAFKA_CFG_MAX_REQUEST_SIZE
              value: "50000000"

          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "1Gi"
              cpu: "2"
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-ui
  name: kafka-ui-service
  namespace: zilla
spec:
  type: ClusterIP
  ports:
    - name: http
      port: 8080
      targetPort: 8080
  selector:
    app: kafka-ui
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-ui
  name: kafka-ui
  namespace: zilla
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-ui
  template:
    metadata:
      labels:
        app: kafka-ui
    spec:
      containers:
        - name: kafka-ui
          image: provectuslabs/kafka-ui
          ports:
            - containerPort: 8080
          env:
            - name: KAFKA_CLUSTERS_0_NAME
              value: local
            - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
              value: kafka-service:29092
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1"

my zilla producer

MQTT_BROKER = os.getenv('MQTT_BROKER', 'my-zilla-app.zilla.svc.cluster.local')
MQTT_PORT = int(os.getenv('MQTT_PORT', 7183))
MQTT_TOPICS = os.getenv('MQTT_TOPICS', 'unit_0/gps_data,unit_0/metocean_data').split(',')
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID', f"client_{uuid.uuid4()}")

messages = [{'topic': 'unit_0/gps_data', 'payload': '{"topic": "unit_0/gps_data", "message": {"com_pos": [19.49675750732422, -3.3331511076539755e-05, -2.3591041564941406], "com_angles": [0.0006370695191435516, 0.7413418292999268, -0.002245832933112979]}, "timestamp": 1727170662.1679373}', 'qos': 1}, {'topic': 'unit_0/metocean_data', 'payload': '{"topic": "unit_0/metocean_data", "message": {"wind_speed": 18.47236053244112, "wind_direction": 237.0019980112083, "wave_height": 0.9289906966638978}, "timestamp": 1727170662.1679373}', 'qos': 1}]

publish.multiple(messages, hostname=MQTT_BROKER, port=MQTT_PORT, protocol=MQTTProtocolVersion.MQTTv5, keepalive=5)

Expected behavior The expected behavior is either to return an error when sending messages or for the messages to be successfully sent, but the process should not get stuck.

Zilla Environment: Describe a k8s pod:

helm install my-zilla oci://ghcr.io/aklivity/charts/zilla -n zilla --values kubernetes/helm/values-zilla.yaml --set-file zilla\\.yaml=zilla/zilla.yaml

Attach the zilla.yaml config file:

---
name: zilla-http-mqtt-kafka-proxy
bindings:
  # Proxy service entrypoint - HTTP and MQTT
  north_tcp_mqtt_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port:
        - 7183
    routes:
      - when:
          - port: 7183
        exit: north_mqtt_server

  north_tcp_http_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port:
        - 7114
    routes:
      - when:
          - port: 7114
        exit: north_http_server

  # HTTP server to handle HTTP connections
  north_http_server:
    type: http
    kind: server
    routes:
      - when:
          - headers:
              :scheme: http
              :authority: 192.168.18.107:7114
        exit: north_http_kafka_mapping

  # HTTP to Kafka proxy mapping
  north_http_kafka_mapping:
    type: http-kafka
    kind: proxy
    routes:
      - when:
          - path: /events
        exit: north_kafka_cache_client
        with:
          capability: produce
          topic: events

  # MQTT server to handle MQTT connections
  north_mqtt_server:
    type: mqtt
    kind: server
    exit: north_mqtt_kafka_mapping

  # MQTT messages to Kafka topics
  north_mqtt_kafka_mapping:
    type: mqtt-kafka
    kind: proxy
    options:
      topics:
        sessions: mqtt-sessions
        messages: mqtt-messages
        retained: mqtt-retained
      clients:
        - client_+
    routes:
      - when:
          - publish:
              - topic: unit_#/gps_data
        with:
          messages: gps_data
        exit: north_kafka_cache_client

      - when:
          - publish:
              - topic: unit_#/metocean_data
        with:
          messages: metocean_data
        exit: north_kafka_cache_client

  # Kafka sync layer
  north_kafka_cache_client:
    type: kafka
    kind: cache_client
    exit: south_kafka_cache_server
  south_kafka_cache_server:
    type: kafka
    kind: cache_server
    options:
      bootstrap:
        - mqtt-messages
        - mqtt-retained
        - mqtt-devices
    exit: south_kafka_client

  # Connect to Kafka
  south_kafka_client:
    type: kafka
    kind: client
    options:
      servers:
        -  kafka-service.zilla.svc.cluster.local:29092
    exit: south_tcp_client
  south_tcp_client:
    type: tcp
    kind: client
telemetry:
  exporters:
    stdout_logs_exporter:
      type: stdout

Client Environment: The client environment is a pod with python inside. Nothing special

vordimous commented 3 days ago

@InigoGastesi Thank you for trying out Zilla! There is a few things that could be causing the issue you are seeing.

  1. You aren't using the kafka-service address that you set up. Try changing your bootstrap server to kafka-service:29092 in your zilla.yaml file:

    # Connect to Kafka
    south_kafka_client:
    type: kafka
    kind: client
    options:
      servers:
        -  kafka-service:29092
    exit: south_tcp_client
  2. double check that you have created the kafka topics that you have defined in your config:

    mqtt-sessions
    mqtt-retained
    mqtt-messages
    gps_data
    metocean_data

    and added your new topics to the list of bootstrapped topics:

    south_kafka_cache_server:
    type: kafka
    kind: cache_server
    options:
      bootstrap:
        - mqtt-messages
        - mqtt-retained
        - gps_data
        - metocean_data
    exit: south_kafka_client
  3. You can leave out the route with :authority: 192.168.18.107:7114 header since that IP might be different in the pod. routing by the protocol is usually enough unless you need more specific address-based routing.

    # HTTP server to handle HTTP connections
    north_http_server:
    type: http
    kind: server
    routes:
      - when:
          - headers:
              :scheme: http
        exit: north_http_kafka_mapping
InigoGastesi commented 2 days ago

I have managed to solve a problem by changing these two things in the Kafka configuration. The main problem was in the listeners configuration, specifically in the EXTERNAL section. After making adjustments in the following lines, the problem was solved

- name: KAFKA_CFG_LISTENERS
  value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9094"
- name: KAFKA_CFG_ADVERTISED_LISTENERS
  value: "INTERNAL://kafka-service:9092,EXTERNAL://kafka-service:9094"

And I changed these configuration from zilla.yaml

south_kafka_client:
    type: kafka
    kind: client
    options:
      servers:
        -  kafka-service.optimar.svc.cluster.local:9094
    exit: south_tcp_client

Thanks for your answer :)

InigoGastesi commented 2 days ago

I have another question. Why when zilla is connected to kafka and it is working perfectly, but if I restart the kafka pod, zilla gets stuck? On the other hand, is there any way to have data persistence with zilla until the connection to Kafka comes back?

vordimous commented 2 days ago

@InigoGastesi, Can you double-check that you are recreating all of the topics correctly after restarting the Kafka pod?

InigoGastesi commented 1 day ago

You were right, I wasn't creating all topics. I have another question, if the connection with Kafka is lost and an IoT device continues sending data to the Zilla proxy, when the connection with Kafka returns, will all the data created until said connection returns be sent to Kafka?