31137 overwrites the per-split metric backlog_bytes.${SPLIT} with a per-partition value rather than the accumulated value for the split. #31281 introduces a Map to store metrics for all past and current splits (1 partition) of the ReadFromKafkaDoFn instance and may repeatedly overwrite non-current splits with stale values. The map used to store these values is not thread-safe and may trigger a ConcurrentModificationException since GetSize and other SDF methods may concurrently attempt to read and write the map. Finally, the per-split caches kept by the instance are keyed on TopicPartition, which is not unique among all splits since the split may override the bootstrap server.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
What happened?
31137 overwrites the per-split metric
backlog_bytes.${SPLIT}
with a per-partition value rather than the accumulated value for the split. #31281 introduces aMap
to store metrics for all past and current splits (1 partition) of theReadFromKafkaDoFn
instance and may repeatedly overwrite non-current splits with stale values. The map used to store these values is not thread-safe and may trigger aConcurrentModificationException
sinceGetSize
and other SDF methods may concurrently attempt to read and write the map. Finally, the per-split caches kept by the instance are keyed on TopicPartition, which is not unique among all splits since the split may override the bootstrap server.Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components