confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.73k stars 882 forks source link

consumer.poll() return None when using consumer.subscribe(), but using consumer.assign() work fine. #1677

Closed ThongVM003 closed 6 months ago

ThongVM003 commented 7 months ago

Description

I am using the docker image of confluent kafka and zookeeper (v7.4.3). When I produce and consume a topic, using consumer.subcribe() and consumer.poll() keep returning None, but when is assign to a specific partition using consumer.assign() i get messages normally.

Checklist

pranavrth commented 7 months ago

Can you please provide the consumer config and debug logs?

ThongVM003 commented 7 months ago

My consumer config:

consumer_config = {
    "bootstrap.servers": "localhost:9092",  # Replace with your Kafka broker(s)
    "group.id": "my-consumer-group",  # Consumer group ID
    "auto.offset.reset": "latest",  # Start consuming from the beginning of the topic
}

docker compose

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.3
    container_name: zookeeper
    ports:
      - "2181:2181"
    restart: always
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
  kafka:
    image: confluentinc/cp-kafka:7.4.3
    hostname: thong
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
      KAFKA_LOG_RETENTION_MINUTES: 10
    restart: always

Consumer code:

import base64
from confluent_kafka import Consumer, KafkaError, OFFSET_END, TopicPartition
import cv2
import numpy as np
import json
from rich import print

# Define Kafka consumer configuration
consumer_config = {
    "bootstrap.servers": "localhost:9092",  # Replace with your Kafka broker(s)
    "group.id": "my-consumer-group",  # Consumer group ID
    "auto.offset.reset": "latest",  # Start consuming from the beginning of the topic
}

# Create a Kafka consumer instance
consumer = Consumer(consumer_config)
# Subscribe to the Kafka topic
topic = "test1"
consumer.subscribe([topic])  # Replace with your topic name

# consumer.poll() return None if the line below is comment
# consumer.assign([TopicPartition(topic, 0, OFFSET_END)])

try:
    while True:
        msg = consumer.poll(1.0)  # Poll for new messages with a timeout of 1 second
        if msg is None:
            print("Waiting...", end="\r")
            continue

        if msg.error():
            # Handle Kafka errors
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("Reached end of partition")
            else:
                print("Error: {}".format(msg.error()))
        else:
            print("received message")
except KeyboardInterrupt:
    pass

finally:
    # Close the Kafka consumer gracefully
    print("Closing Kafka consumer...", end="\r")
    # cv2.destroyAllWindows()
    # consumer.close()
    print("Kafka consumer closed.")

The problem is that if i comment the consumer.assign() line, consumer.poll() keep returning None

pranavrth commented 7 months ago

If you want to read from the starting of the topic, use "auto.offset.reset": "earliest"

ThongVM003 commented 7 months ago

I understand, however as you can see in the consumer code i have consumer.assign() comment. So when i execute the code no messages is received. But if i uncomment it, and assign the consumer to partition 0 of topic test1, the message is received normally.

I don't understand why using just consumer.subcribe() and not assgining a specific partition then i cannot receive any message.

pranavrth commented 6 months ago

The subscribe() API and auto.offset.reset config work with group management protocol and assign() directly assigns the topics. assign() API doesn't use auto.offset.reset config and will start consuming from the start.