grepplabs / kafka-proxy

Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
Apache License 2.0
501 stars 87 forks source link

Consuming message in go works, but failed in python #91

Closed ichxxx closed 3 years ago

ichxxx commented 3 years ago

Hi, I am trying to use kafka-proxy, but I got a problem: I can not consume message using python (both kafka-python and confluent-kafka-python are tried), and I didn't get any error message in kafka-proxy.

kafka-proxy

kafka-proxy server --bootstrap-server-mapping "10.101.104.8:9092,0.0.0.0:29092" --debug-enable --log-level debug

python code (python3.7)

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'auto.offset.reset': 'earliest',
    'group.id': "test",
    "debug": "protocol",
})
c.subscribe(['test'])

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

kafka-proxy log

time="2021-10-13T16:24:02+08:00" level=info msg="Starting kafka-proxy version unknown"
time="2021-10-13T16:24:02+08:00" level=info msg="Bootstrap server 10.101.104.8:9092 advertised as 0.0.0.0:29092"
time="2021-10-13T16:24:02+08:00" level=info msg="Listening on 0.0.0.0:29092 ([::]:29092) for remote 10.101.104.8:9092"
time="2021-10-13T16:24:02+08:00" level=info msg="Ready for new connections"
time="2021-10-13T16:24:36+08:00" level=info msg="New connection for 10.101.104.8:9092"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka request key 18, version 3, length 62"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka response key 18, version 3, length 10"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka request key 18, version 0, length 17"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka response key 18, version 0, length 268"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka request key 3, version 4, length 22"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka request key 10, version 1, length 26"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka response key 3, version 4, length 68"
time="2021-10-13T16:24:36+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka response key 10, version 1, length 34"
time="2021-10-13T16:24:36+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka request key 10, version 1, length 26"
time="2021-10-13T16:24:36+08:00" level=debug msg="Kafka response key 10, version 1, length 34"
time="2021-10-13T16:24:36+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
time="2021-10-13T16:24:38+08:00" level=debug msg="Kafka request key 10, version 1, length 26"
time="2021-10-13T16:24:38+08:00" level=debug msg="Kafka response key 10, version 1, length 34"
time="2021-10-13T16:24:38+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
time="2021-10-13T16:24:39+08:00" level=debug msg="Kafka request key 10, version 1, length 26"
time="2021-10-13T16:24:39+08:00" level=debug msg="Kafka response key 10, version 1, length 34"
time="2021-10-13T16:24:39+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
time="2021-10-13T16:24:40+08:00" level=debug msg="Kafka request key 10, version 1, length 26"
time="2021-10-13T16:24:40+08:00" level=debug msg="Kafka response key 10, version 1, length 34"
time="2021-10-13T16:24:40+08:00" level=debug msg="Address mappings broker=10.101.104.8:9092, listener=0.0.0.0:29092, advertised=0.0.0.0:29092"
...

debug info in confluent-kafka-python (using kafka-proxy):

[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Connected (#1)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent ApiVersionRequest (v3, 66 bytes @ 0, CorrId 1)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received ApiVersionResponse (v3, 6 bytes, CorrId 1, rtt 1.81ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Protocol parse failure for ApiVersion v3(flex) at 3/6 (rd_kafka_handle_ApiVersion:2126) (incorrect broker.version.fallback?)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: ApiArrayCnt -1 out of range
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent ApiVersionRequest (v0, 21 bytes @ 0, CorrId 2)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received ApiVersionResponse (v0, 264 bytes, CorrId 2, rtt 6.50ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 3)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 4)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received MetadataResponse (v4, 59 bytes, CorrId 3, rtt 1.60ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 4, rtt 2.33ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 5)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 5, rtt 4.16ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 6)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 6, rtt 1.73ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 7)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 7, rtt 1.92ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 8)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 8, rtt 7.36ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 9)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 9, rtt 1.82ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 10)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 10, rtt 7.28ms)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 11)
[thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Received FindCoordinatorResponse (v1, 25 bytes, CorrId 11, rtt 1.84ms)
.....

Keep send/receive FindCoordinatorRequest/Response repeatedly.

debug info in confluent-kafka-python (not using kafka-proxy):

[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Connected (#1)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Sent ApiVersionRequest (v3, 66 bytes @ 0, CorrId 1)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Received ApiVersionResponse (v3, 6 bytes, CorrId 1, rtt 2.23ms)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Protocol parse failure for ApiVersion v3(flex) at 3/6 (rd_kafka_handle_ApiVersion:2126) (incorrect broker.version.fallback?)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: ApiArrayCnt -1 out of range
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Sent ApiVersionRequest (v0, 21 bytes @ 0, CorrId 2)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Received ApiVersionResponse (v0, 264 bytes, CorrId 2, rtt 1.15ms)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 3)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 4)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/bootstrap: Received MetadataResponse (v4, 64 bytes, CorrId 3, rtt 1.05ms)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/0: Received FindCoordinatorResponse (v1, 30 bytes, CorrId 4, rtt 1.27ms)
[thrd:10.101.104.8:9092/bootstrap]: 10.101.104.8:9092/0: Sent FindCoordinatorRequest (v1, 30 bytes @ 0, CorrId 5)
[thrd:GroupCoordinator]: GroupCoordinator/0: Connected (#1)

It can successful connected. And I found when using kafka-proxy, the FindCoordinatorResponse is 5 bytes short.

Thanks fo any help!

everesio commented 3 years ago

Could you provide configuration and versions of the broker, proxy and python client lib ? I've just tried a python consumer using confluent-kafka 1.7.0 connecting to the brokers 2.8.0 via kafa-proxy using PLAIN transport and had no problems

ichxxx commented 3 years ago

I start kafka by docker, the version is 1.1.1 and my confluent-kafka version is 1.7.0 :

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: bitnami/kafka:1.1.1
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.101.104.8:9092
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_CREATE_TOPICS="test:1:1"
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
ichxxx commented 3 years ago

I mapping ip 0.0.0.0 to a domain kafka.test.com and change the configuration of kafka-proxy --bootstrap-server-mapping "10.101.104.8:9092,kafka.test.com:29092", and it works fine now.