confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
106 stars 894 forks source link

consumer group offset is advanced even if message is uncommited #542

Closed ldybal closed 3 years ago

ldybal commented 5 years ago

Description

I'm not sure if this is a driver, librdkafka or Kafka issue...

We use manual commit, but there are situations when the consumer group offset for the message partition is advanced even if the app quits without commiting the message. If the app never commits, after restart it starts back at the earliest offset, as configured (OK) If the app reads a single message and quits without commiting, it restarts at that message offset (OK)

BUT, if the app reads and commits the first message, then reads the second an quits without commiting, when it restarts it gets the third message

How to reproduce

Single broker "cluster", creates topic with single partition, populates it with 10 messages, then run an app (with enable.auto.commit=false and a group.id) that reads and commits a mesage then reads a second one and quits without commiting; if you then run the app again it gets the third mesage when it should get the second.

Checklist

confluent-kafka-python : 0.11.6 librdkafka: 1.0.0 Kafka broker version: 2.1.0 running on Docker (wurstmeister/kafka:2.12-2.1.0) OS: Oracle Linux Server 7.5 No exceptions in the Kafka log.

Client logs (this app dumps the messages into files; it logs the partition/offset of received messages and logs a message before commiting):

FIRST RUN: dybal@dybal-desktop ~/dev/src/code.ceptro.br/pipe/teste-commit $ kubeprod logs -n simet test-commit -f time="2019-02-13T19:28:33.244Z" program=ktd level=INFO ID=None msg="Setting Log Level to INFO" time="2019-02-13T19:28:33.245Z" program=ktd level=INFO ID=None msg="Init ktd, Version 1.1.2 GitCommit b11a130" time="2019-02-13T19:28:33.245Z" program=ktd level=INFO ID=None msg="Consuming from topic test_lab_LMAP_reports_valid (kafka kafka.default:9092) using consumer group test-commit-t0" time="2019-02-13T19:28:33.245Z" program=ktd level=INFO ID=None msg="Serving Prometheus at port 81" time="2019-02-13T19:28:33.345Z" program=ktd level=INFO ID=None msg="Accepting healthcheck requests on *:8080/health-check" time="2019-02-13T19:28:33.346Z" program=ktd level=INFO ID=None msg="Filename Policy OFFSET, Collision Policy BACKUP" time="2019-02-13T19:28:33.346Z" program=ktd level=INFO ID=None msg="Sleeping 20s before starting main loop" time="2019-02-13T19:29:33.388Z" program=ktd level=INFO ID=None msg="Received report id:None (1545 bytes) from partition/offset 0/0 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:29:33.393Z" program=ktd level=INFO ID=f94f7afe-90e5-11e8-8225-02700f180b55 msg="Wrote file 00-000000000000.json" time="2019-02-13T19:29:33.393Z" program=ktd level=INFO ID=f94f7afe-90e5-11e8-8225-02700f180b55 msg="Commiting input topic" time="2019-02-13T19:29:33.393Z" program=ktd level=INFO ID=None msg="Received report id:None (1348 bytes) from partition/offset 0/1 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:29:33.394Z" program=ktd level=INFO ID=None msg="Read 2 messages, generating exception" time="2019-02-13T19:29:33.394Z" program=ktd level=ERROR ID=None msg="Unhandled exception: Traceback (most recent call last): File "/scripts/common/loop.py", line 42, in mainLoop float(None) TypeError: float() argument must be a string or a number, not 'NoneType' " time="2019-02-13T19:29:33.394Z" program=ktd level=INFO ID=None msg="Closing Kafka consumer" time="2019-02-13T19:29:33.499Z" program=ktd level=WARNING ID=None msg="Leaving!"

SECOND RUN: dybal@dybal-desktop ~/dev/src/code.ceptro.br/pipe/teste-commit $ kubeprod logs -n simet test-commit -f time="2019-02-13T19:29:59.633Z" program=ktd level=INFO ID=None msg="Setting Log Level to INFO" time="2019-02-13T19:29:59.634Z" program=ktd level=INFO ID=None msg="Init ktd, Version 1.1.2 GitCommit b11a130" time="2019-02-13T19:29:59.634Z" program=ktd level=INFO ID=None msg="Consuming from topic test_lab_LMAP_reports_valid (kafka kafka.default:9092) using consumer group test-commit-t0" time="2019-02-13T19:29:59.634Z" program=ktd level=INFO ID=None msg="Serving Prometheus at port 81" time="2019-02-13T19:29:59.721Z" program=ktd level=INFO ID=None msg="Accepting healthcheck requests on *:8080/health-check" time="2019-02-13T19:29:59.722Z" program=ktd level=INFO ID=None msg="Filename Policy OFFSET, Collision Policy BACKUP" time="2019-02-13T19:29:59.722Z" program=ktd level=INFO ID=None msg="Sleeping 20s before starting main loop" time="2019-02-13T19:30:59.774Z" program=ktd level=INFO ID=None msg="Received report id:None (1692 bytes) from partition/offset 0/2 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:30:59.779Z" program=ktd level=INFO ID=228385b2-90e5-11e8-afa5-620fb472fcf2 msg="Wrote file 00-000000000002.json" time="2019-02-13T19:30:59.779Z" program=ktd level=INFO ID=228385b2-90e5-11e8-afa5-620fb472fcf2 msg="Commiting input topic" time="2019-02-13T19:30:59.780Z" program=ktd level=INFO ID=None msg="Received report id:None (26052 bytes) from partition/offset 0/3 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:30:59.780Z" program=ktd level=INFO ID=None msg="Read 2 messages, generating exception" time="2019-02-13T19:30:59.781Z" program=ktd level=ERROR ID=None msg="Unhandled exception: Traceback (most recent call last): File "/scripts/common/loop.py", line 42, in mainLoop float(None) TypeError: float() argument must be a string or a number, not 'NoneType' " time="2019-02-13T19:30:59.781Z" program=ktd level=INFO ID=None msg="Closing Kafka consumer" time="2019-02-13T19:30:59.887Z" program=ktd level=WARNING ID=None msg="Leaving!"

THIRD RUN: dybal@dybal-desktop ~/dev/src/code.ceptro.br/pipe/teste-commit $ kubeprod logs -n simet test-commit -f time="2019-02-13T19:31:33.652Z" program=ktd level=INFO ID=None msg="Setting Log Level to INFO" time="2019-02-13T19:31:33.653Z" program=ktd level=INFO ID=None msg="Init ktd, Version 1.1.2 GitCommit b11a130" time="2019-02-13T19:31:33.653Z" program=ktd level=INFO ID=None msg="Consuming from topic test_lab_LMAP_reports_valid (kafka kafka.default:9092) using consumer group test-commit-t0" time="2019-02-13T19:31:33.653Z" program=ktd level=INFO ID=None msg="Serving Prometheus at port 81" time="2019-02-13T19:31:33.747Z" program=ktd level=INFO ID=None msg="Accepting healthcheck requests on *:8080/health-check" time="2019-02-13T19:31:33.747Z" program=ktd level=INFO ID=None msg="Filename Policy OFFSET, Collision Policy BACKUP" time="2019-02-13T19:31:33.747Z" program=ktd level=INFO ID=None msg="Sleeping 20s before starting main loop" time="2019-02-13T19:32:33.806Z" program=ktd level=INFO ID=None msg="Received report id:None (26052 bytes) from partition/offset 0/3 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:32:33.812Z" program=ktd level=INFO ID=c0d4a341-bb4a-11e8-8c89-66dfe9221d42 msg="Wrote file 00-000000000003.json" time="2019-02-13T19:32:33.813Z" program=ktd level=INFO ID=c0d4a341-bb4a-11e8-8c89-66dfe9221d42 msg="Commiting input topic" time="2019-02-13T19:32:33.813Z" program=ktd level=INFO ID=None msg="Received report id:None (23397 bytes) from partition/offset 0/4 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:32:33.813Z" program=ktd level=INFO ID=None msg="Read 2 messages, generating exception" time="2019-02-13T19:32:33.814Z" program=ktd level=ERROR ID=None msg="Unhandled exception: Traceback (most recent call last): File "/scripts/common/loop.py", line 42, in mainLoop float(None) TypeError: float() argument must be a string or a number, not 'NoneType' " time="2019-02-13T19:32:33.814Z" program=ktd level=INFO ID=None msg="Closing Kafka consumer" time="2019-02-13T19:32:33.920Z" program=ktd level=WARNING ID=None msg="Leaving!"

FOURTH RUN: dybal@dybal-desktop ~/dev/src/code.ceptro.br/pipe/teste-commit $ kubeprod logs -n simet test-commit -f time="2019-02-13T19:32:55.485Z" program=ktd level=INFO ID=None msg="Setting Log Level to INFO" time="2019-02-13T19:32:55.486Z" program=ktd level=INFO ID=None msg="Init ktd, Version 1.1.2 GitCommit b11a130" time="2019-02-13T19:32:55.486Z" program=ktd level=INFO ID=None msg="Consuming from topic test_lab_LMAP_reports_valid (kafka kafka.default:9092) using consumer group test-commit-t0" time="2019-02-13T19:32:55.486Z" program=ktd level=INFO ID=None msg="Serving Prometheus at port 81" time="2019-02-13T19:32:55.581Z" program=ktd level=INFO ID=None msg="Accepting healthcheck requests on *:8080/health-check" time="2019-02-13T19:32:55.582Z" program=ktd level=INFO ID=None msg="Filename Policy OFFSET, Collision Policy BACKUP" time="2019-02-13T19:32:55.582Z" program=ktd level=INFO ID=None msg="Sleeping 20s before starting main loop" time="2019-02-13T19:33:55.588Z" program=ktd level=INFO ID=None msg="Received report id:None (22585 bytes) from partition/offset 0/5 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:33:55.594Z" program=ktd level=INFO ID=abe4ed25-bb4a-11e8-8c52-66dfe9221d42 msg="Wrote file 00-000000000005.json" time="2019-02-13T19:33:55.594Z" program=ktd level=INFO ID=abe4ed25-bb4a-11e8-8c52-66dfe9221d42 msg="Commiting input topic" time="2019-02-13T19:33:55.595Z" program=ktd level=INFO ID=None msg="Received report id:None (32546 bytes) from partition/offset 0/6 of topic test_lab_LMAP_reports_valid" time="2019-02-13T19:33:55.595Z" program=ktd level=INFO ID=None msg="Read 2 messages, generating exception" time="2019-02-13T19:33:55.596Z" program=ktd level=ERROR ID=None msg="Unhandled exception: Traceback (most recent call last): File "/scripts/common/loop.py", line 42, in mainLoop float(None) TypeError: float() argument must be a string or a number, not 'NoneType' " time="2019-02-13T19:33:55.596Z" program=ktd level=INFO ID=None msg="Closing Kafka consumer" time="2019-02-13T19:33:55.700Z" program=ktd level=WARNING ID=None msg="Leaving!"

edenhill commented 5 years ago

Can you reproduce this with "debug": "cgrp,topic" set and provide the logs?

gylu commented 5 years ago

I am seeing this also on python 3.7, confluent-kafka==1.0.0, using consumer.consume() Essentially results in a dropped message without any error/exception/warning