confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
98 stars 894 forks source link

consumer can't fetch data due to assign not work #338

Closed joukosusi closed 6 years ago

joukosusi commented 6 years ago

Description

Consumer can't not fetch data from kafka, it seems assign function received a bank partition list and not work, i don't know how to check where the problem is.

logs attached: kafka.log server.log

Checklist

Please provide the following information:

edenhill commented 6 years ago

Can you share your code (relevant parts)? How many consumers are there in the group and how many partitions does the topic have?

joukosusi commented 6 years ago

ok, it just have only one partition and one consumer running `

consumer = Consumer(   
    { 'group.id': 'DATA_CENTER_COLLECTION_SYSTEM',
      'bootstrap.servers': '192.168.2.6:8092,192.168.2.6:8093,192.168.2.6:8094',
      'security.protocol': 'ssl' ,
      'ssl.key.location': './kafka_test_cert/client.key.unsecure',
      'ssl.ca.location': './kafka_test_cert/ca-cert',
      'ssl.certificate.location': './kafka_test_cert/ca-cert/client.crt',
      'ssl.key.password': '123456',
      'api.version.request': True,
      'broker.version.fallback': '0.10.2.1',
      'log.connection.close': False,
      'log_level': 6,
      'message.max.bytes': 314572800,
      'statistics.interval.ms': 0,
      'session.timeout.ms': 30000,
      'enable.auto.commit': True,
      'auto.commit.interval.ms': 1000,
      'fetch.min.bytes': 1024 * 1024,
      'fetch.wait.max.ms': 1000,
      'fetch.message.max.bytes': 1048576,
      'default.topic.config': {
          'auto.offset.reset': 'smallest',
      },
)

consumer.subscribe(['c71f97ea0435ad4d6a0b168f0062404827560e5aa00a043bf427ff339fd17de74474e6df26bdba6ea0cc54b4d6228cc174d600e966de35224eba41d10d3f09ab_monitor'])

while True:

    result = consumer.poll(timeout=0.00001)

    if not result:
        continue

    if result.error():
        if result.error().code() == KafkaError._PARTITION_EOF:
            logging.warning('%s [%d] reached end at offset %d\n'.format(
                result.topic(), result.partition(), result.offset()
            ))
        else:
            logging.error('encourage error:\n{}'.format(json.dumps(
                {
                    'name': result.error().name(),
                    'code': result.error().code(),
                    'description': result.error().str()
                }, indent=1)))
    else:
        logging.info(result.offset())

`

edenhill commented 6 years ago

It looks like there are two consumers in the group, see how LeaderId and MemberId differs in this line?:

%7|1521689833.375|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 148, Protocol range, LeaderId rdkafka-1674c56f-1562-4653-8735-61f9778886d0, my MemberId rdkafka-de4fb92c-67bd-4533-9b95-f5cd3350fb7b, 0 members in group: (no error)

(ignore 0 members, it is expected on the non-leader)

The following assignment is empty because with only one partition available for the entire group it was assigned to the other member:

"DATA_CENTER_COLLECTION_SYSTEM": delegating assign of 0 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment

joukosusi commented 6 years ago

Thanks very much!! but there's only one program running, is there a way to check or clean all consumer_id in the client side?? i check the path(/consumers/DATA_CENTER_COLLECTION_SYSTEM/ids) in zookeeper and found it was a blank list.

joukosusi commented 6 years ago

i tried the command ./kafka-consumer-groups.sh --group DATA_CENTER_COLLECTION_SYSTEM --describe --bootstrap-server 192.168.2.6:9092,192.168.2.6:9093,192.168.2.6:9094 and get the following result:

`

TOPIC                           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                             HOST                           CLIENT-ID
-                              -          -               -               -           rdkafka-de4fb92c-67bd-4533-9b95-f5cd3350fb7b      x.x.x.x/x.x.x.x                 rdkafka
TOPIC_MONITOR_TASK_MONITOR_PIPE 0          42246           43606           1360       rdkafka-1674c56f-1562-4653-8735-61f9778886d0      x.x.x.x/x.x.x.x                 rdkafka
TOPIC_MONITOR_JOB_RESULT_PIPE   0          308668          310179          1511       rdkafka-1674c56f-1562-4653-8735-61f9778886d0      x.x.x.x/x.x.x.x                 rdkafka
c71f97ea0435ad4d6a0b168f0062404827560e5aa00a043bf427ff339fd17de74474e6df26bdba6ea0cc54b4d6228cc174d600e966de35224eba41d10d3f09ab_monitor                     0          157780          160662          2882       rdkafka-1674c56f-1562-4653-8735-61f9778886d0      x.x.x.x/x.x.x.x                 rdkafka
c71f97ea0435ad4d6a0b168f0062404827560e5aa00a043bf427ff339fd17de74474e6df26bdba6ea0cc54b4d6228cc174d600e966de35224eba41d10d3f09ab                             0          5191            5192            1          rdkafka-1674c56f-1562-4653-8735-61f9778886d0      x.x.x.x/x.x.x.x                 rdkafka
c71f97ea0435ad4d6a0b168f0062404827560e5aa00a043bf427ff339fd17de74474e6df26bdba6ea0cc54b4d6228cc174d600e966de35224eba41d10d3f09ab_job_detail                  0          1535987         1583224         47237      rdkafka-1674c56f-1562-4653-8735-61f9778886d0      x.x.x.x/x.x.x.x                 rdkafka

`

there does exist two consumers at the same time, stop my code and then rdkafka-de4fb92c-67bd-4533-9b95-f5cd3350fb7b will disappear from above list, and this is the only one consumer.

i have met this problem several times and most occured after consumer disconnect and reconnect from broker.

edenhill commented 6 years ago

You need to make sure the previous instance of the consumer program is terminated before you start the new one if you want to ensure that the single partition is assigned to the new consumer. This is outside the scope of the python client itself.