askap-vast / vast-pipeline

This repository holds the code of the Radio Transient detection pipeline for the VAST project.
https://vast-survey.org/vast-pipeline/
MIT License
8 stars 3 forks source link

Dask dataframes poorly partitioned #709

Closed ddobie closed 2 months ago

ddobie commented 7 months ago

I suspect that a lot of our memory woes result from the dask dataframes being poorly partitioned. A common process throughout the pipeline is to do something like

dd.from_pandas(df, n_cpu).groupby("source")

which is bad for two reasons.

First, it results in partition sizes that are not necessarily sensible. They should be something like ~100MB and so for large dataframes (e.g. at the pair metrics step), using n_cpu partitions results in enormous partitions. Fix: check the size of the dataframe and calculate the number of partitions accordingly.

Second, it results in sources being spread across partitions, which causes issues when the groupby is subsequently computed. The solution is to set the index to be the source.

In short, we should be replacing the above code with something like

df_size = df.get_memory_usage(deep=True)/1e6
partition_size = 100#MB
num_partitions = np.ceil(df_size/partition_size)

dd.from_pandas(df.set_index('source'), num_partitions).groupby("source")
ddobie commented 2 months ago

Closed by #724