confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
98 stars 894 forks source link

Consumer Code is not not creating Client ID or topic information over Conflunet Control Center #688

Closed NiravLangaliya closed 4 years ago

NiravLangaliya commented 5 years ago

Description

I have created the topic 'EDW_TEST_DATA' and producing data and I do see at confluent control center that my Consume group is created as per group.ID mentioned in python code but I am not able to see that client ID is created and also no topic is listed in consumption at the control center.

image

Code:

from confluent_kafka import KafkaError from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError import threading,time , pprint import json

settings = { 'bootstrap.servers': 'XXXXXX:9092,' ,'group.id': 'edwdevgroupid12' ,'client.id': '200' ,'enable.auto.commit': False ,'session.timeout.ms': 6000 ,'default.topic.config': {'auto.offset.reset': 'smallest'} ,'schema.registry.url': 'http://XXXXXXX:8081' }

def run(name): c = AvroConsumer(settings)

c.subscribe(['EDW_TEST_DATA'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print("%s Data processed %s [%d] at offset %d with key %s:\n" % (name, msg.topic(), msg.partition(), msg.offset(), str(msg.key())))
    print (msg.value()['Name'])
    print(msg.value()['Contact']['Email'])
    print(msg.value()['Contact']['PhoneNumber'])
    print(msg.value()['LocationDetails']['CountryCode'])
    print(msg.value()['LocationDetails']['City'])
    print(msg.value()['LocationDetails']['Country'])
    print(msg.value()['LocationDetails']['PostalCode'])
    print(msg.value()['LocationDetails']['Address'])

print("Shutting down consumer..")

c.close()

workers = [] for index in range(4): w = threading.Thread(target=run, args=(index,)) w.start() workers.append(w) time.sleep(2)

for w in workers: w.join()

How to reproduce

Checklist

Please provide the following information:

rnpridgeon commented 4 years ago

You need to configure monitoring interceptors in order for C3 to know about your clients.

https://docs.confluent.io/current/control-center/installation/clients.html#librdkafka-based-clients