grepplabs / kafka-proxy

Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
Apache License 2.0
500 stars 86 forks source link

Infinite retries : NotLeaderOrFollowerException #106

Open ThomasDangleterre opened 2 years ago

ThomasDangleterre commented 2 years ago

Hello,

I have an issue while using kafka-proxy :

[Producer clientId=geco-1] Received invalid metadata error in produce request on partition private_dkt_out_listener_kafka_geco_v1-2 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now

[Producer clientId=geco-1] Got error produce response with correlation id 862812 on topic-partition private_dkt_out_listener_kafka_geco_v1-1, retrying (2147483398 attempts left). Error: NOT_LEADER_OR_FOLLOWER

This error repeats itself without pushing data to the topic.

Here is my configuration :

containers:
      - name: vcstream-uat-proxy
        image: <anonymized>/kafka-proxy:v0.3.1
        command:
        - '/bin/sh'
        - '-c'
        - |
          apk update && apk add bind-tools

          AIVEN_BROKER_IPS=$(dig +short  <anonymized> )
          AIVEN_BROKER_IP_1=$(echo "$AIVEN_BROKER_IPS" | sed -n '1p')
          AIVEN_BROKER_IP_2=$(echo "$AIVEN_BROKER_IPS" | sed -n '2p')
          AIVEN_BROKER_IP_3=$(echo "$AIVEN_BROKER_IPS" | sed -n '3p')

          SERVICE_IP=$(dig +short <anonymized> | sed -n '1p')

          EXTERNAL_IP=10.48.28.151

          echo " Ip Broker Aiven n° 1 : $AIVEN_BROKER_IP_1"
          echo " Ip Broker Aiven n° 2 : $AIVEN_BROKER_IP_2"
          echo " Ip Broker Aiven n° 3 : $AIVEN_BROKER_IP_3"

          echo " cluster Ip Service ingress-vcstream-uat-proxy : $SERVICE_IP"
          echo " External IP is $EXTERNAL_IP"

          tty=$(readlink /proc/$$/fd/2)

          echo "Launching kafka proxy ..."

          echo $ca_cert | base64 -d  > ca_cert.pem && \
          echo $access_cert | base64 -d  > access.cert && \
          echo $access_key | base64 -d  > access.key && \
          /opt/kafka-proxy/bin/kafka-proxy server \
          --log-format=json \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_1":12658,0.0.0.0:12658,"$EXTERNAL_IP":12658 \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_2":12658,0.0.0.0:12659,"$EXTERNAL_IP":12659 \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_3":12658,0.0.0.0:12660,"$EXTERNAL_IP":12660 \
          --dynamic-listeners-disable \
          --tls-enable \
          --tls-ca-chain-cert-file ca_cert.pem \
          --tls-client-cert-file access.cert \
          --tls-client-key-file access.key \
          --tls-client-key-password $keystore_password \
          --tls-insecure-skip-verify \
          --proxy-listener-tls-enable \
          --proxy-listener-key-file access.key \
          --proxy-listener-cert-file access.cert \
          --proxy-listener-key-password $keystore_password \
          --proxy-listener-ca-chain-cert-file ca_cert.pem \
          --tls-same-client-cert-enable  | tee $tty | grep -q 'i/o timeout' | exit 1

We dynamically fetch IPS of our broker and fail when we get 'i/o timeout' in logs in order to improve resilience.

Does anyone knows why our producer can't produce data to the topic ?

ljakob commented 2 years ago

Hi, We've just bounced into the same problem... we are still analyzing if it's proxy-related

Did you resolve the issue? Do you have any hint for us?

Thanks Leif

ThomasDangleterre commented 2 years ago

Hello,

We observed that after a restart the issue was gone. As the /health doesn't fit our need to detect that kafka message production is not working, we created a custom image with kafkacat and a script that produces messages ( on a dedicated topic, always on partition ) trough the proxy and consume them.

The script shown as below is used it in the livenessProbe of the kafka proxy's deployment , so in case of error it will trigger a restart.

#!/bin/sh

#automatically exit on error
set -e

timestamp=$(date '+%s')

echo "Sending message to $HOST:$PORT $LIVENESS_TOPIC"

# produce message in topic
echo "$HOSTNAME $timestamp" | kafkacat -P -b "$EXTERNAL_IP":"$PORT"  -t "$LIVENESS_TOPIC" -p 0 \
    -X security.protocol=SSL \
    -X ssl.key.location=service.key \
    -X ssl.certificate.location=service.cert \
    -X ssl.ca.location=ca.pem

while true; do
  #  Consume last message of the topic and exit
  payload=$( kafkacat -C -b "$HOST":"$PORT"  -t "$LIVENESS_TOPIC" -o  -1 -e \
    -X security.protocol=SSL \
    -X ssl.key.location=service.key \
    -X ssl.certificate.location=service.cert \
    -X ssl.ca.location=ca.pem \
  )

  if [ "$payload" = "$HOSTNAME $timestamp" ]; then
    break;
  fi;

done;

exit 0

We get some transient errors triggering restarts but overall the connection is stable now.

% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating
%3|1661178021.406|FAIL|rdkafka#producer-1| [thrd:ssl://10.48.28.151:12660/32]: ssl://10.48.28.151:12658/32: No further error information available (after 0ms in state SSL_HANDSHAKE)