dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

different consumers in one group sometimes receive duplicated message #1589

Closed gzliudan closed 5 years ago

gzliudan commented 6 years ago

1 topic has 5 consumers with same group_id and different client_id, but sometimes 2 consumers received same message, which is not expected. How to resolve this problem? kafka-python is 1.4.3, kafka is 1.0.0. My codes is below, client_id is not write out here:

def handle_kafka_message(msg):

process msg

# save msg to database

def handle_kafka_messages(): kafka_consumer = None while kafka_consumer is None: try: kafka_consumer = KafkaConsumer("topic1", client_id="client-***", group_id="group1", bootstrap_servers="kafka1a:9092,kafka1b:9092,kafka1c:9092") except Exception as e: print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), e, flush=True) time.sleep(60)

for kafka_msg in kafka_consumer:
    handle_kafka_message(kafka_msg)

if name == 'main': handle_kafka_messages()

bain2236 commented 6 years ago

Also had this issue, we fixed it by using redis to store a guid that we attach to our messages before sending and then we check if we've seen this guid on the consumer end. if we have we discard that message

tvoinarovskyi commented 6 years ago

You need yo properly set up ConsumerRebalanceListener for this to be less of an issue. Probably you have 2 consumers switch over the partition without proper commit before the switch.

bain2236 commented 6 years ago

How is that done? there is about 3 mentions of this listener in the docs with 0 definition behind any of them. I've rolled back to 1.3.5 and everything is working as expected.

dpkp commented 5 years ago

You should always write your application to handle duplicate messages (be idempotent). That said, it is possible that some bugs in the 1.4 release caused duplicate messages to be more common than in the 1.3 release. Try the latest 1.4.4 and see if this problem remains for you.