Closed jacobtomlinson closed 1 year ago
This has been fixed in Dask. There are further improvements that could be made to beam, but there seems to be little to no engagement from the beam community on this. So I'm going to close this out as completed and leave https://github.com/apache/beam/issues/26669 around in case beam folks want to collaborate and iterate further.
This is a high-level tracker issue for issues/PRs in both Dask and Beam.
Summary
I'm exploring some unexpected behaviour which is creating a limitation on how far Apache Beam workloads can scale with a Dask runner. This was originally discovered on NGC Base Command Platform, although the bug seems to be general to all Beam on Dask deployments.
Beam uses
dask.bag
to store collections of items (which Beam calls a PCollection). Callingbeam.Create(iterable)
is translated todask.bag.from_sequence(iterable)
. It then translates calls to functions likebeam.ParDo
todask.bag.map
.The problem we are investigating was discovered when a Beam workload wouldn't scale beyond ~200 workers. Additional workers added to the Dask cluster would sit idle. After some investigation, it seems the root cause is Dask Bag's default partitioning scheme which limits you to 100 tasks beyond a certain number of items in the bag.
This means it is never possible to full utilize clusters with more than 200 GPUs.