dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.58k stars 1.4k forks source link

Trying to use a Bitnami docker image with kafka-python. The messages does not get to Consumer... #2275

Open Felix-neko opened 2 years ago

Felix-neko commented 2 years ago

Hi folks! And thank you for you great work.

I'm trying to set up some developer's playground to start using kafka-python. To do it, I've tried to use Bitnami's docker repo with Kafka (modified for external connection according to their manuals).

I've modified docker-compose.yml for external connection, started it -- and trying to create a producer and a consumer in Python.

# start_consumer.py
from kafka import KafkaConsumer

if __name__ == "__main__":
    consumer = KafkaConsumer('sample', bootstrap_servers=['localhost:9093'])
    for message in consumer:
        print(message)
# start_producer.py
from kafka import KafkaProducer

if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
    producer.send('sample', b'Hello, World!')
    producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

The topic on Kafka server is created, but my Consumer does not get any messages and prints nothing. How can I fix it?

https://github.com/Felix-neko/kafka_sandbox -- here's my sandbox to reproduce this strange behaviour. Maybe I have some wrong Kafka setup?

Beaglefoot commented 2 years ago

Hey @Felix-neko have you found how to overcome this?

Beaglefoot commented 2 years ago

Ok, I think I got the idea how this works.

You have 3 options here:

  1. To produce after the consumer is started.
  2. Set group_id and explicitly reset offset for it with (works with kafka 3.0.0, not sure about other versions):
    kafka-consumer-groups.sh\
    --bootstrap-server localhost:9092\
    --group MY_GROUP\
    --topic MY_TOPIC\
    --reset-offsets\
    --to-earliest\
    --execute
  3. Set group_id and use auto_offset_reset='earliest'

And although this makes sense in retrospective I really wish this was covered by documentation.

Felix-neko commented 2 years ago

3. Set group_id and use auto_offset_reset='earliest'

Well, i did in this way:

from kafka import KafkaProducer

if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
    future = producer.send('sample', b'Hello, World!')
    result = future.get(timeout=60)
    print("Sent 1")

    future = producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
    result = future.get(timeout=60)
    print("Sent 2")