WayScience / CytoSnake

Orchestrating high-dimensional cell morphology data processing pipelines
https://cytosnake.readthedocs.io
Creative Commons Attribution 4.0 International
3 stars 3 forks source link

Multi-processing implementation in aggregation #10

Closed axiomcura closed 2 years ago

axiomcura commented 2 years ago

Motivation

Currently it takes around 110 minutes to aggregate all 9 SQLite single-cell plate data (130.07 GB) into aggregated . However, we can improve the computational time if we split the job to multiple cores

This PR demonstrate multi-processing support in the aggregate_cells.py in order to improve computational performance.

Approach

Changes were applied to the aggregate_cells.py and preprocess.smk under the aggregate rule. Since this step is an iterative process, we can assign each input to an individual core to conduct the aggregation process.

However, this requires some tweaks on how the parameters should be passed in order to conduct multi-processing

# transforming snakemake objects into python standard datatypes
sqlfiles = [str(sqlfile) for sqlfile in snakemake.input["sql_files"]]
cell_count_out = [str(out_name) for out_name in snakemake.output["cell_counts"]]
aggregate_profile_out = [
    str(out_name) for out_name in snakemake.output["aggregate_profile"]
]
meta_data_dir = itertools.repeat(str(snakemake.input["metadata"]))
barcode_map = itertools.repeat(str(snakemake.input["barcodes"]))
config_path = itertools.repeat(str(snakemake.params["aggregate_config"]))
inputs = list(
    zip(sqlfiles, meta_data_dir, barcode_map, cell_count_out, aggregate_profile_out, config_path)
)

# init multi process
n_cores = int(snakemake.threads)
if n_cores > len(inputs):
    print(
        f"WARNING: number of specify cores exceeds number of inputs, defaulting to {len(inputs)}"
    )
    n_cores = len(inputs)

with mp.Pool(processes=n_cores) as pool:
    pool.starmap(aggregate, inputs)
    pool.close()
    pool.join()

The aggregate rule will use the threads rule attribute as a parameter for specifying the number of cores to use in the aggregate process

# preprocess.smk
rule aggregate:
    input:
        sql_files=expand("data/{plate_id}.sqlite", plate_id=PLATE_IDS),
        barcodes="data/barcode_platemap.csv",
        metadata="data/metadata",
    output:
        cell_counts=expand(
            "results/preprocessing/{plate_id}_cell_counts.tsv", plate_id=PLATE_IDS
        ),
        aggregate_profile=expand(
            "results/preprocessing/{plate_id}_aggregate.csv.gz", plate_id=PLATE_IDS
        ),
    conda:
        "../envs/cytominer_env.yaml"
    params:
        aggregate_config=config["config_paths"]["single_cell"],
    threads: config["analysis_configs"]["preprocessing"]["threads"]
    script:
        "../scripts/aggregate_cells.py"

Looking at the threads rule attribute, the number of threads specified is in the CytoPipe's general config file:

# configuration.yaml
config_name: cytopipe_defaults
analysis_configs:
  preprocessing:
    threads: 4
config_paths:
  single_cell: "configs/analysis_configs/single_cell_configs.yaml"
  annotate: "configs/analysis_configs/annotate_configs.yaml"
  normalize: "configs/analysis_configs/normalize_configs.yaml"
  feature_select: "configs/analysis_configs/feature_select_configs.yaml"
  aggregate: "configs/analysis_configs/aggregate_configs.yaml"

allowing for user to easily access and change the number of threads used under the preprocessing step. (NOTE: multi processing is only implemented in the aggregate rule and not the following subsequent rules.

Implementation Diagram

Below is a simple diagram on how the multi-processing was implemented.

mp_diagram

Simple demonstration of the multi processing diagram. The array above is the set of input parameters required for a function. Then the multi processing module will reserve memory for the parameters and execute it through a CPU. Each CPU will run the aggregate step independently until the parameters list is finished

Performance Results

To find the optimal number of cores for aggregation, multiple tests were conducted, where each test has an increase of 2 cores for computation. The time was calculated by using the time command in the terminal: time snakemake -c 10 --use-conda -r aggregate

performance_chart perfromance_plot

The multi-processing implementation does increase aggregation performance, however there is still a major caveat. The largest file will be bottleneck the performance.

In order for the aggregate function to be considered “complete” all cores needs to be finished. This raises and issues with larger files as it will take the most amount of time to finish the processes; therefore, it must wait for that specific processes to finish. really good explanation here

Unfortunately, this is an I/O bound related issue, which means that your computer spends a lot of time reading from the hard disk before conducting any processes. In addition, I/O bound processes is a bottleneck feature as transferring data from the HDD to the RAM is very slow.