Recently I have been running some benchmarks on TFT using a variety of different datasets. One thing I have noticed is that CacheableCombineAccumulate seems to have a bit of a scaling issue, in particular with tft.bucketize and the QuantilesCombiner.
As a concrete example, I ran an experiment with a dataset of 20 billion rows and around 1000 dense scalar columns. I added just one analyzer: tft.bucketize(num_buckets=100, elementwise=True) and applied it to the 1000 columns concatenated together.
Dataflow job ID: 2020-06-12_11_25_12-8283309558685381372
Elapsed time: 21 hr 57 min
Total vCPU time: 11,455.279 vCPU hr
Around 19 of the 22 wall-clock hours were spent on the InitialCombineGlobally step.
To verify that this was a bottleneck on InitialCombineGlobally I forked _IntermediateAccumulateCombineImpl and added .with_fanout(1000) to beam.CombineGlobally.
Dataflow job ID: 2020-06-22_13_45_28-14720730178676069772
Elapsed time: 2 hr 43 min
Total vCPU time: 6,638.944 vCPU hr
It would be nice if we could annotate our analyzers/combiners with the desired fanout size so that we have control over this issue should it arise. Also, while the above experiment seems like a rather perversely-chosen dataset and transformation combination that is unlikely to exist in practice, this workflow is somewhat similar to certain real-world problems that I am working with so this would be very helpful to have fixed.
Recently I have been running some benchmarks on TFT using a variety of different datasets. One thing I have noticed is that
CacheableCombineAccumulate
seems to have a bit of a scaling issue, in particular withtft.bucketize
and theQuantilesCombiner
.As a concrete example, I ran an experiment with a dataset of 20 billion rows and around 1000 dense scalar columns. I added just one analyzer:
tft.bucketize(num_buckets=100, elementwise=True)
and applied it to the 1000 columns concatenated together.Dataflow job ID:
2020-06-12_11_25_12-8283309558685381372
Elapsed time: 21 hr 57 min Total vCPU time: 11,455.279 vCPU hrAround 19 of the 22 wall-clock hours were spent on the
InitialCombineGlobally
step.To verify that this was a bottleneck on
InitialCombineGlobally
I forked_IntermediateAccumulateCombineImpl
and added.with_fanout(1000)
tobeam.CombineGlobally
.Dataflow job ID:
2020-06-22_13_45_28-14720730178676069772
Elapsed time: 2 hr 43 min Total vCPU time: 6,638.944 vCPU hrIt would be nice if we could annotate our analyzers/combiners with the desired fanout size so that we have control over this issue should it arise. Also, while the above experiment seems like a rather perversely-chosen dataset and transformation combination that is unlikely to exist in practice, this workflow is somewhat similar to certain real-world problems that I am working with so this would be very helpful to have fixed.