… to the lack of new messages in the topic despite the set property rotate.interval.ms. Property flush.size now also works correctly.
Problem
Occasionally I faced a problem when files were stuck in +tmp folder despite the set property rotate.interval.ms. This situation was observed when new messages stopped falling into the topic. The parameter
rotate.interval.ms did not work, because there was no trigger that could cause a check on it.
Also, while working on this problem, I was able to fix the rotation mechanism and accordingly the flush.size parameter. As written in the documentation: rotation can be controlled by the file collection time (rotate.interval.ms) and/or by the number of messages in the file (flush.size). In fact, rotation by time did not work by the file collection time, but by the time of the field for partitioning. And if one Kafka partition contained messages whose difference in values in the field by which partitioning was done was greater than the value of rotate.interval.ms, then rotation occurred and premature flush of the file from the temporary folder to the permanent one. This led to a large number of small files and the inability to correctly control writing using the flush.size parameter. Now this problem is not observed.
Solution
I added a check for this parameter in HdfsSinkTask.preCommit(). Also, I fixed the logic of rotation in TopicPartitionWriter.shouldRotateAndMaybeUpdateTimers() method.
… to the lack of new messages in the topic despite the set property rotate.interval.ms. Property flush.size now also works correctly.
Problem
Occasionally I faced a problem when files were stuck in
+tmp
folder despite the set propertyrotate.interval.ms
. This situation was observed when new messages stopped falling into the topic. The parameterrotate.interval.ms
did not work, because there was no trigger that could cause a check on it. Also, while working on this problem, I was able to fix the rotation mechanism and accordingly theflush.size
parameter. As written in the documentation: rotation can be controlled by the file collection time (rotate.interval.ms
) and/or by the number of messages in the file (flush.size
). In fact, rotation by time did not work by the file collection time, but by the time of the field for partitioning. And if one Kafka partition contained messages whose difference in values in the field by which partitioning was done was greater than the value ofrotate.interval.ms
, then rotation occurred and premature flush of the file from the temporary folder to the permanent one. This led to a large number of small files and the inability to correctly control writing using theflush.size
parameter. Now this problem is not observed.Solution
I added a check for this parameter in
HdfsSinkTask.preCommit()
. Also, I fixed the logic of rotation inTopicPartitionWriter.shouldRotateAndMaybeUpdateTimers()
method.Does this solution apply anywhere else?
Test Strategy
Testing done: