askap-vast / vast-tools

A collection of tools that are useful for the VAST project and for exploration of results from the VAST Pipeline.
https://vast-survey.org/vast-tools/
MIT License
8 stars 0 forks source link

Pipeline: large jobs needs faster support #207

Closed ajstewart closed 4 years ago

ajstewart commented 4 years ago

Loading the entire pilot survey pipeline run into the notebook interface took 38m 30s.

This was always going to be the case as I purposely went pandas only just to get it working for now, but with the entire survey it is already becoming a bit slow.

Two epoch analysis takes 45 minutes plus (it hasn't finished yet).

While this is not so terrible in the grand scheme of things for the entire Pilot Survey it is far from ideal when you might want to run the query multiple times with different filters.

The memory usage is minimal however, running the above used about 6 GB of RAM. So we are not at a point yet where these dataframes cannot fit into sensible memory.

Dask is probably the answer here as the two epoch still requires calculations. Although possibly Vaex could also be useful as it's becoming more of a visualisation problem.

Though the two epoch functions can definitely be improved to be faster (on the other hand we may move these calculations to the pipeline stage https://github.com/askap-vast/vast-pipeline/issues/232)

marxide commented 4 years ago

Try the function below. It accepts 3 DataFrames:

I don't know if it's the most efficient, but it takes about 5 mins to run on my laptop for a simulated dataset of ~7.3 million measurements (114 fields 8,000 sources per field 8 epochs). Also note that I'm taking the easy way out and only looking at sources with no siblings.

from itertools import combinations
import numpy as np
import pandas as pd

def calculate_two_epoch_metrics(sources_df, associations_df, measurements_df) -> pd.DataFrame:
    # create DataFrame of source measurements without siblings
    associations_no_siblings_df = (
        # only keep sources without siblings
        associations_df.set_index("source_id")
        .loc[sources_df.query("n_sibl == 0").index]
        .reset_index()
        # add peak flux and error
        .join(measurements_df[["flux_peak", "flux_peak_err"]], on="meas_id")
    )

    # create a DataFrame of all measurement pairs
    measurement_combinations = associations_no_siblings_df.groupby("source_id")["meas_id"].apply(
        lambda x: pd.DataFrame(list(combinations(x, 2)))
    ).reset_index(level=1, drop=True).rename(columns={0: "meas_id_a", 1: "meas_id_b"}).astype(int).reset_index()

    # add the measurement fluxes and errors
    association_fluxes = associations_no_siblings_df.set_index(["source_id", "meas_id"])[["flux_peak", "flux_peak_err"]]
    measurement_combinations = measurement_combinations.join(
        association_fluxes,
        on=["source_id", "meas_id_a"],
    ).join(
        association_fluxes,
        on=["source_id", "meas_id_b"],
        lsuffix="_a",
        rsuffix="_b",
    )

    # calculate 2-epoch metrics
    measurement_combinations["vs"] = (
        (measurement_combinations.flux_peak_a - measurement_combinations.flux_peak_b) / 
        np.hypot(measurement_combinations.flux_peak_err_a, measurement_combinations.flux_peak_err_b)
    )

    measurement_combinations["m"] = 2 * (
        (measurement_combinations.flux_peak_a - measurement_combinations.flux_peak_b)
        / (measurement_combinations.flux_peak_a + measurement_combinations.flux_peak_b)
    )
    return measurement_combinations
ajstewart commented 4 years ago

Thanks for this @marxide. I went back and looked at what I did and it was not good 😅. I had followed the same principal as you above but had failed to panda-ise the middle section, no idea why not! This meant it was really slow. I've incorporated your pandas code for the building part and it's much better. An all-images pipeline run takes about 3 minutes to process.

I'll also note here that I've switched the measurement parquet loading to using Dask and this can now load the all images pipeline run in about 1 min 30 seconds.

ajstewart commented 4 years ago

Below is assuming that the two epoch calculations no longer need to be calculated by vast-tools.

To document what I've been experimenting with over the past few days using a full pilot survey run including Epoch 12. Running this through the pipeline produces ~9.6 million measurements including forced fits, although running this on an 'epoch based' association this reduces to about 6.5 million or so (a normal association run with forced extractions will break past 10 million measurements).

In a pure pandas environment we have the machines to handles such a dataframe. The measurements dataframe for this run has an in memory footprint of about 4 - 5 GB. This is ok but starts to become problematic if multiple users wish to open the full pipeline run to play around with, suddenly half the Nimbus memory could be used up with open dataframes.

All other parquets are not very problematic in memory. The sources parquet (~800k rows) is only 200 MB.

in the current rc.3 v2 release the following takes place:

Firstly this process is good for small runs. It's fast and the memory footprint is not too bad.

But for large runs, while it's still relatively fast taking about 1 min, the downside is the memory used by the measurements and the merges. If the .compute() is delayed with Dask to later (i.e. performing merges and then trying to make selections, then .compute()) the memory improves but the time taken is very slow (10 mins +).

One solution I explored to solve the memory problem is using vaex which is a library to explore dataframes in an out-of-core context. However simply subbing in Vaex to the process above arose the following problems:

Vaex can memory-map HDF5 and .arrow files. So to get around these issues it was hugely beneficial to create a measurements.arrow file for the large run. This enables the opening of the job to take milliseconds with the measurements loaded into an out-of-core dataframe with no memory footprint. Selections are fast - and as this file is pre-made, no merges are required (the before mentioned image info like paths are appended only when needed when a source is fetched).

This also has the advantage of having the measurements loaded in Vaex which makes doing aggregate calculations/filters on ~10 million rows very fast. The measurements.arrow for the full run was 1.4 GB - not bad at all.

With this it seems to be a huge advantage to have the ability to create a measurements.arrow file for large runs, which can be added into the pipeline. The generation of the .arrow file is not intensive and takes about 2 mins.

vast-tools can then check for the presence of this file in the job directory and open, or falling back to the Dask method if not.