confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.73k stars 882 forks source link

Specifying a Python list for config `bootstrap.servers` fails silently with no errors or indications #711

Open ceosion opened 4 years ago

ceosion commented 4 years ago

Description

The configuration key bootstrap.servers implies that multiple servers can be specified, but it is not explicitly defined what this key's value should be. It appears that only a string using comma separation is allowed for this config key. If a Python list is given, no explicit error or feedback is given to indicate this is not valid. Instead, calls to consumer.poll() or other broker-bound functions simply block and never succeed.

Ideally, a Python list (e.g. ["broker1", "broker2"]) should be valid; or at the very least, constructing the Consumer should throw a runtime/configuration exception.

How to reproduce

Simply specify a Python list as the value for bootstrap.servers in the config given to the Consumer.

conf = {
    'bootstrap.servers': ['kafkabroker1:9092', 'kafkabroker2:9092'],
    ...
}
c = Consumer(conf)
while True:
    c.poll(1) # this will never produce a message due to the Python list specified above as the bootstrap.servers value

Checklist

Please provide the following information:

edenhill commented 4 years ago

This is a good idea, we should allow lists for config variables and automatically translate to CSV.

kyrsideris commented 4 years ago

I felt on the same bug today. The problem is that the raised exception is misleading. In my case that was:

KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

Test file: ./test_consumer_config.py

#!/usr/bin/env python3

from confluent_kafka import Consumer

def print_list_of_topics(hosts):
    print(f'\nTrying with hosts: {hosts}')
    consumer = Consumer({
        'bootstrap.servers': hosts,
        'group.id': 'kafka-topic-configurator',
        'auto.offset.reset': 'earliest'
    })
    metadata = consumer.list_topics(timeout=11)
    print(f'Topics: {list(metadata.topics.keys())}')

print_list_of_topics('localhost:9092')
print_list_of_topics('localhost:9092,kafka:9092')
print_list_of_topics(['localhost:9092'])

Run:

➜ python test_consumer_config.py

Trying with hosts: localhost:9092
Topics: ['test-stream-1', '__consumer_offsets', 'test-stream-2']

Trying with hosts: localhost:9092,kafka:9092
Topics: ['test-stream-1', '__consumer_offsets', 'test-stream-2']

Trying with hosts: ['localhost:9092']
Traceback (most recent call last):
  File "test_consumer_config.py", line 19, in <module>
    print_list_of_topics(['localhost:9092'])
  File "test_consumer_config.py", line 13, in print_list_of_topics
    metadata = consumer.list_topics(timeout=11)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
DachuanZhao commented 3 years ago

This is a good idea, we should allow lists for config variables and automatically translate to CSV.

any processing ?

llirrikk commented 11 months ago

According to the official documentation: https://docs.confluent.io/kafka-clients/python/current/overview.html#ak-consumer multiple servers can be specified in the following format: host1:9092,host2:9092

sergeykad commented 5 months ago

Thanks @llirrikk . I searched how to do it here, but there is no mention of it.