aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.17k stars 234 forks source link

[QUESTION] Understanding why code using AIOKafkaConsumer is hanging and not finishing execution. #888

Closed nikolamilojica closed 1 year ago

nikolamilojica commented 1 year ago

Hi there!

Could you help me understand why the following code is hanging and not finishing execution? I am using this docker-compose configuration to spin up local Kafka:

version: "3.8"
services:
  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      - KAFKA_BROKER_ID=1
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    volumes:
      - kafka:/var/lib/kafka/data
    depends_on:
      - zookeeper
  pg:
    container_name: pg
    image: postgres:latest
    restart: always
    environment:
      - POSTGRES_USER=illuminate
      - POSTGRES_PASSWORD=$ILLUMINATE_MAIN_DB_PASSWORD
      - POSTGRES_DB=tutorial
    ports:
      - "5432:5432"
    volumes:
      - postgres:/var/lib/postgresql/data
  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    restart: always
    environment:
      - PGADMIN_DEFAULT_EMAIL=root@example.com
      - PGADMIN_DEFAULT_PASSWORD=$ILLUMINATE_PGADMIN_PASSWORD
    ports:
      - "8080:80"
  splash:
    container_name: splash
    image: scrapinghub/splash
    restart: always
    ports:
      - "8050:8050"
  zookeeper:
    container_name: zookeeper
    image: zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
    volumes:
      - zookeeper:/data
volumes:
  kafka:
    driver: local
  postgres:
    driver: local
  zookeeper:
    driver: local

Once Kafka is up, I create a topic and fill it with a few short messages using the following Docker commands:

docker exec kafka kafka-topics --bootstrap-server localhost:9092 --create --topic illuminate
docker exec -it kafka kafka-console-producer --bootstrap-server localhost:9092 --topic illuminate

After filling the illuminate topic with the short and simple messages: first, second, and third, the following code hangs (which is essentially the most basic case for the AIOKafkaConsumer class):

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'illuminate',
        bootstrap_servers='localhost:9092',
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        group_id=None)
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Output with KeyboardInterrupt after 3 hours of "hanging":

poetry run python test.py 
consumed:  illuminate 0 0 None b'first' 1683479435151
consumed:  illuminate 0 1 None b'second' 1683479436229
consumed:  illuminate 0 2 None b'third' 1683479437313
^CTraceback (most recent call last):
  File "test.py", line 22, in <module>
    asyncio.run(consume())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

Is my setup flawed? Any help is more then appreciated. Thanks in advance.

ods commented 1 year ago

What behaviour do you expect? async for msg in consumer just waits for new message to arrive.

nikolamilojica commented 1 year ago

What behaviour do you expect? async for msg in consumer just waits for new message to arrive.

Hello, I was assuming once the messages are consumed by consumer object, interpreter should move to finally block.

vmaurin commented 1 year ago

Usually, kafka is used in stream processing approach, so the iterator you get from async for is infinite until the consumer/event loop is stopped.

If you want to do some kind of batch processing, you can use getmany with a timeout that will return after X milliseconds if no messages are consumed

nikolamilojica commented 1 year ago

Usually, kafka is used in stream processing approach, so the iterator you get from async for is infinite until the consumer/event loop is stopped.

If you want to do some kind of batch processing, you can use getmany with a timeout that will return after X milliseconds if no messages are consumed

Thank you for this clarification. I do not want to listen topic forever, I just want to go through messages. Once I test this, I will close the question.