Closed ChorPangChan closed 7 years ago
Thanks for testing this. This issue is already fixed in 1.0.8. Did you see same issue in 1.0.8 as well ?
Hi, I'm a coworker with @ChorPangChan.
This issue is already fixed in 1.0.8. Did you see same issue in 1.0.8 as well ?
No, this is not fixed. We tested in 1.0.8 and found the issue still exists. Here are the logs:
A succeeded pattern:
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local : 0 there in internal buffers
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is 76
17/01/20 14:18:34 DEBUG kafka.PartitionManager: LastComitted Offset 75
17/01/20 14:18:34 DEBUG kafka.PartitionManager: New Emitted Offset 77
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Enqueued Offset 76
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 77
17/01/20 14:18:34 DEBUG kafka.PartitionManager: Committed offset 77 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
... followed by a failed pattern:
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local : 0 there in internal buffers
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
17/01/20 14:19:21 DEBUG kafka.PartitionManager: LastComitted Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: New Emitted Offset 78
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Enqueued Offset 77
17/01/20 14:19:21 DEBUG kafka.PartitionManager: Last Enqueued offset 77 not incremented since previous Comitted Offset 77 for partition Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user.
...
# Infinity loop
The result is even worse than 1.0.6. Committing to zookeeper kept failing, and fell into an infinity loop.
@dibbhatt
It seems that you want to put _emittedToOffset
into zkp:
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L290
If this succeeded, _lastEnquedOffset
will equal to _lastComittedOffset
in next loop, so the condition in here will be false:
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L286
... and the next offset will never be pushed into zkp.
What if I do
if (_lastEnquedOffset >= _lastComittedOffset) { ... }
that should work right. Can you please raise a PR and I will merge it .
Dib
It works well for a running program which keeps consuming messages from kafka. But when the program starts/restarts, there will be a new infinity loop which keeps committing same offset to zkp:
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Read partition information from : /consumer-path/test_test_kpi_reg_user/topic_tid_9999_dev/partition_0 --> {"partition":0,"offset":85,"topic":"topic_tid_9999_dev","broker":{"port":9092,"host":"cdh005.testdev.local"},"consumer":{"id":"test_test_kpi_reg_user"}}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Read last commit offset from zookeeper:85 ; old topology_id:test_test_kpi_reg_user - new consumer_id: test_test_kpi_reg_user
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Starting Consumer cdh005.testdev.local :0 from offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: LastComitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: New Emitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Enqueued Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committed offset 85 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
17/01/20 17:21:18 DEBUG kafka.PartitionManager: LastComitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: New Emitted Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Enqueued Offset 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Wrote committed offset to ZK: 85
17/01/20 17:21:18 DEBUG kafka.PartitionManager: Committed offset 85 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user
...
# Infinity loop
The infinity loops caused by this line: https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/PartitionManager.java#L211
commit()
should be executed only in the case of buffer is not empty. That is, it should be inside of the if
block at L207.
Actually, we can simply fix this issue by changing from >
to >=
. But in fact, the code saves the _emittedToOffset
instead of the last-processed-offset - which is mentioned in README.md
- into zookeeper. I think you should unify it.
OK sure. If you can make the PR with fix and ReadMe that will be great. . Or I will take a look at it over the weekend to fix.
OK. I'll make a PR next Monday.
PartitionManager does not write Offset to zookeeper in some condition duplicated data will be fetched if program restart without writing offset to zookeeper
How to reproduce: Fetch one line at a time from Kafka
version kafka-spark-consumer : 1.0.6 spark-streaming_2.10 : 1.6.0
here is the log immediately after deploying the Spark program to YARN notice that offset 6 is already duplicated and PartitonManager cannot write offset 6/7 to zookeeper due to https://github.com/dibbhatt/kafka-spark-consumer/blob/928fe9eb7d2fefbdd08a9127baa6d3880beee1ff/src/main/java/consumer/kafka/PartitionManager.java#L388
produce another record -> Write succeed
produce another record -> write failed again
it seems that this issue can be fix by saving processed offset(_lastEnquedOffset) to zookeeper and +1 to _emittedToOffset from the constructer having README.md states that "the last processed offset is write in Zookeeper." dont know if these change will affect other class or not
only tested for 1.0.6 but base on the logic this issue should reproduce in 1.0.8 as well