Beam Java 2.59.0 introduced Lineage metrics support for file-based IO (FileIO, TextIO, etc).
When a pipeline read from lots of files (e.g. using a file pattern and match lots of file), one observes Dataflow UI metrics based components are broken. For example, live throughput no longer shown, progress bar stale, user counters increment incompletely.
This is due to some internal limit of total job status response size of Dataflow runner (grpc limit ~20 MB). When the size is exceeded such limit, all metrics update (counter, stringset, etc) gets dropped
Writes to lots of files (e.g. set a large shard number), one observe the following slowness:
Operation ongoing in step Write content to files/WriteFiles/FinalizeTempFileBundles/Finalize for at least 15m00s without outputting or completing in state process in thread pool-3-thread-2 with id 27
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet$RegularSetBuilderImpl.insertInHashTable(ImmutableSet.java:780)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet$RegularSetBuilderImpl.add(ImmutableSet.java:763)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet$Builder.add(ImmutableSet.java:527)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet$Builder.add(ImmutableSet.java:478)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:475)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet$Builder.addAll(ImmutableSet.java:549)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.metrics.StringSetData.combine(StringSetData.java:58)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.metrics.StringSetCell.update(StringSetCell.java:62)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.metrics.StringSetCell.add(StringSetCell.java:104)
at org.apache.beam.sdk.metrics.Metrics$DelegatingStringSet.add(Metrics.java:179)
at org.apache.beam.sdk.metrics.Lineage.add(Lineage.java:133)
This was because the stringset metrics is added in the finalize write step (after moving temp file to final destination), done on single worker. Unfortunately current implementation of stringSetData.addAll is of O(N^2) complexity -- each time it copies to a new ImmutableSet, and done this for N elements.
What happened?
Beam Java 2.59.0 introduced Lineage metrics support for file-based IO (FileIO, TextIO, etc).
This is due to some internal limit of total job status response size of Dataflow runner (grpc limit ~20 MB). When the size is exceeded such limit, all metrics update (counter, stringset, etc) gets dropped
This was because the stringset metrics is added in the finalize write step (after moving temp file to final destination), done on single worker. Unfortunately current implementation of stringSetData.addAll is of O(N^2) complexity -- each time it copies to a new ImmutableSet, and done this for N elements.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components