teragrep / cfe_39

HDFS Data Ingestion for PTH_06 use
GNU Affero General Public License v3.0
0 stars 3 forks source link

Erroneously fetching same records twice from Kafka. #30

Open Tiihott opened 4 months ago

Tiihott commented 4 months ago

Describe the bug When ingesting records from kafka (not mock kafka consumer) the same set of records are consumed twice causing exception of trying to store the same file twice to HDFS.

Expected behavior Records should only be consumed once from Kafka, and thus storing the same AVRO-file and set of records only once to HDFS.

How to reproduce Default configuration with kerberized HDFS access and mock kafka consumer disabled.

Software version beta 0.2.0

Additional context

Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]: Exception in thread "jla_022" java.lang.RuntimeException: File 18.717496 already exists
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.HDFSWrite.commit(HDFSWrite.java:168)
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:333)
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:71)
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.KafkaReader.read(KafkaReader.java:95)
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.ReadCoordinator.run(ReadCoordinator.java:133)
Jul 04 16:37:47 iris-hdfsdi01.qa.xnet.fi java[7469]:         at java.lang.Thread.run(Thread.java:750)

Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]: Exception in thread "__consumer_offsets1" java.lang.NullPointerException
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:257)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:71)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.KafkaReader.read(KafkaReader.java:95)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.ReadCoordinator.run(ReadCoordinator.java:133)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at java.lang.Thread.run(Thread.java:750)

Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]: Exception in thread "__consumer_offsets2" com.teragrep.rlo_06.PriorityParseException: PRIORITY < missing
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.rlo_06.PriorityFunction.apply(PriorityFunction.java:65)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.rlo_06.PriorityFunction.apply(PriorityFunction.java:51)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.rlo_06.Fragment.accept(Fragment.java:85)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.rlo_06.Fragment.accept(Fragment.java:54)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at java.util.function.Consumer.lambda$andThen$0(Consumer.java:65)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.rlo_06.RFC5424Frame.next(RFC5424Frame.java:122)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:261)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.DatabaseOutput.accept(DatabaseOutput.java:71)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.KafkaReader.read(KafkaReader.java:95)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at com.teragrep.cfe_39.consumers.kafka.ReadCoordinator.run(ReadCoordinator.java:133)
Jul 04 16:37:46 iris-hdfsdi01.qa.xnet.fi java[7469]:         at java.lang.Thread.run(Thread.java:750)
Tiihott commented 3 months ago

Replicated PriorityParseException and NullPointerException in local tests and traced the underlying issue of the exceptions. They seem to be separate issues from the RuntimeException. Fixes for PriorityParseException and NullPointerException in PR #35.

Tiihott commented 2 months ago

Moved NullPointerException handling to record processing, so kafka consumer offset control won't be affected by the null record handling. In other words, null records are properly consumed and marked as committed but not processed and stored to HDFS.

Tiihott commented 2 months ago

The RuntimeException issue origin was tracked to incomplete user mapping for kerberos, which allowed the creation of the 18.717496 file but failed at writing the contents of the file. Because writing of the file contents failed and caused an exception, the offsets for consumed kafka records were not committed for the records that were being processed. As the first consumer failed, the consumer group rebalanced and the second consumer thread tried to consume and process the same records again but this time failed at the earlier HDFS file creation stage because there already existed an empty file with the same name in HDFS.