Cloufield / gwaslab

A Python package for handling and visualizing GWAS summary statistics. https://cloufield.github.io/gwaslab/
GNU General Public License v3.0
160 stars 25 forks source link

Add cache to speedup strand inference #86

Closed sup3rgiu closed 7 months ago

sup3rgiu commented 7 months ago

Currently, parallelinferstrand() distributes the workload across N cores using multiprocessing. However, this step can be further optimized if we have at disposal a stored cache containing all the information needed to perform the step.

General idea

  1. Suppose we want to run a pipeline on N different GWAS, but always with the same reference .vcf.gz file.
  2. parallelinferstrand() fetches the data from the .vcf.gz, which is a quite time-consuming operation if it need to be done millions of times (sure, the .tbi index helps a lot the fetching performance, but it's still slow compared to an in-memory approach)
  3. Instead of fetching from the .vcf.gz every time the pipeline is run, we can read the whole reference file once and build a cache (i.e. a Python dictionary) containing only the information needed to perform the check_strend and check_indel steps. This information includes: record.pos, record.ref, record.alts, record.info[ref_alt_freq][0].
  4. When parallelinferstrand() reaches the check_strend and/or check_indel steps, it checks if the cache exists and if it's true, the cache is used.

Actual implementation

Why load the cache in the background? If our pipeline consists of several steps, it's very likely that the harmonization step is not the first one. So, we could start loading the cache while the pipeline is still running. Of course this approach is advantageous if we have >1 core.

CacheLoaderThread vs CacheLoaderProcess vs CacheProcess

All three classes can be used to load the cache in the background, however:

Usage

The only API change is that now parallelinferstrand has a new argument cache_options (which can be passed within the inferstrand_args arguement of harmonize()). Such cache_options argument is a dictionary which may contains the following keys:

Basic usage:

sumstats.harmonize(..., inferstrand_args={'cache_options': {'use_cache': True}})

This will start loading the cache once the harmonization step reaches the parallelinferstrand method.

Advanced usage (preloading):

Somewhere at the begin of the pipeline (e.g. immediatly after the creation of the sumstats object):

cache_process = gwaslab.cache_manager.CacheProcess(vcf_gz_path, ref_alt_freq=ref_alt_freq, n_cores=NUM_WORKERS, log=sumstats.log, verbose=True)
cache_process.start()

Then, when needed:

sumstats.harmonize(..., inferstrand_args={'cache_options': {'cache_process': cache_process}})

In both basic usage and advanced usage, the cache will be automatically built if not found. So, in practice, there is no need to manually instantiate and use a CacheBuilder object. Also, 99% of the time there is no need to manually create a CacheManager.

N.B. This PR does not change the default behavior, so the cache will not be built and used unless explicitly requested by the settings in cache_options.

Improvement and drawbacks

For a real-world GWAS (and not a toy one) with a real-world VCF file, the parallelinferstrand() step could take 10-15 minutes (depending on how many cores are available). With the cache implementation, it could take up to 3 mins to load the cache, but then the check_strand and check_indels steps take few seconds. These 3 minutes required to load the cache can be "removed" if the loading is done in the background. The drawback of this approach is that, depending on the VCF file, the cache could be result quite big when loaded into memory (in my tests, a cache for 80M variants requires ~40GB of RAM). In the future, we could try to limit this drawback by building a separate cache for each chromosome, and then loading one cache at a time into memory. This will probably slow down the process a bit, since the cache has to be loaded on the fly when it is needed (although you could preload the next cache while the first one is being used).

As you can see from this PR and my previous PRs #80 #81, I'm trying to optimize some steps. This is because I need to run a pipeline on ~10000 GWAS, and since I have a HPC available, I'm not very resource constrained (so the increase in RAM usage is fine). However, all these optimization could save days (if not weeks).

Thanks for your efforts, and I'm available for any clarification or further testing (which, as always, I invite you to do on your end as well) :)

Cloufield commented 7 months ago

Thanks again for your great efforts! Actually, I was also thinking about how to enhance running a pipeline for multiple datasets. My initial idea was to harmonize one dataset(, imputation panel, or something) and then use the harmonized dataset as a template/reference to harmonize other datasets, which could be fast for both harmonization and assigning rsIDs. (g_SumstatsT.py; not finished though). Other ideas for this include building a hdf5 file for fast access. (util_ex_process_h5.py, currently only for assigning CHR and POS based on rsID). I think these are similar to your idea in some ways.
Since it does not change the default behavior, I would be very glad to merge your implementation for now (after reading your codes and running some tests soon).

Cloufield commented 7 months ago

Hi Andrea,

I test your codes with a VCF file with 3M records and it worked quite well. Great implementation!!

And I also have some thoughts that we can probably discuss before merging.

  1. redundancy in cache

    current cache structure (the information extracted by fetching the records in VCF):

    {'X:60019:60020': [[60020, 'T', ('TA',), 0.003968250006437302]],
    'X:60025:60026': [[60026, 'T', ('C',), 0.0009920640150085092]],

    I am wondering if we can simply construct SNPID(CHR:POS:REF:ALT)-ALT_AF pairs when iterating through records in VCF to make it slightly more compact (no need to keep the complex structure).

    {'X:60020:T:TA': 0.003968250006437302,
    'X:60026:T:C': 0.0009920640150085092,

    And then simply use SNPID and ALT_AF for checking in check_strandand check_indels.

  2. split cache based on variant types for parallelinferstrand , we only need palindromic SNPs (A/T, C/G SNPs) and indels, which accounts for a small part of all variants in a reference VCF. But current cache contains all variants. When building cache, I am thinking maybe we can split the cache to two parts (1) non-palindromic SNPs (like xxxx.npsnp.cache) and (2) palindromic SNPs and indels (xxxx.pi.cache).
    For parallelinferstrand, only part 2 is needed. Part 1 plus part 2 can be used for parallelecheckaf and paralleleinferaf in a similar way as you implemented for parallelinferstrand.

  3. ref_alt_freq verification for cache Since one VCF may have multiple IDs in INFO, (like the allele frequencies for different ancestry), We need to verify if ref_alt_freq is indeed the ID in INFO used for building cache. Currently when cache exists, changing ref_alt_freq won't have any effect.

  4. slightly more flexible cache format I am wondering if we can use hdf5 format instead of a dictionary pickle for cache since it is more flexible (maybe) and support data slicing. For example, we can use {ref_alt_freq}_{variant_type}_{chr} as keys for groups in HDF5 file.

    • ref_alt_freq: EAS_AF, EUR_AF ... (related to point 3)
    • variant_type : non-palindromic, palindromic SNPs and indels... (related to point 2)
    • chr: 1, 2, ... (related to Improvement and drawbacks in your message)

In each group, the data is just a long table of SNPID(CHR:POS:REF:ALT)-ALT_AF pairs like:

SNPID AF
X:60020:T:TA 0.003968250006437302
X:60026:T:C 0.0009920640150085092

We can then just load the needed parts instead of the entire file to reduce memory usage.

  1. cache-building process took quite a long time for a VCF with 80M records (more than 3 hours with 4 cores), maybe we can slightly improve the speed?

I would like to hear your thoughts on these points. Thanks a lot!

sup3rgiu commented 7 months ago

Hi there, I completely agree with all your thoughts! I have pushed some new commits to reflect these suggestions.

At a high level, the API is still the same, but now:

Following your idea, the cache is now organized in a hierarchical HDF5 file, with the following structure:

.
└── ref_alt_freq (group) (e.g. EAS_AF, EUR_AF)
    └── category (group) (e.g. pi, np, all)
        ├── chrom 1 (group)
        │   ├── keys (dataset) (e.g. 1:60020:T:TA)
        │   └── values (dataset) (e.g. 0.003968250006437302)
        └── chrom 2 (group)
            ├── keys (dataset)
            └── values (dataset)

When needed, the cache is reloaded using ref_alt_freq and category as reference. Then a single dict is created, loading all keys and values from each chromosome.

This approach uses much less memory because the data structure is simpler and there are fewer records.

As for your point (5), it's weird (it took me about 30 minutes on 8 cores). Anyway, I have now migrated the cache building to multiprocessing instead of multithreading, and it is much faster on 8 cores. However, other factors could affect the performance of the building process, such as the storage on which the vcf is stored (e.g. a high performance SSD vs. HDD).

Let me know what you think of these changes!

Cloufield commented 7 months ago

Thanks a lot for your amazing efforts! The latest commit works well. It is much faster than before (less than 3 minutes to build the cache with 4 cores) and uses much less memory and storage. I think the main function is ready and I will merge it now. Probably we can improve some details later (updating/overwriting cache).

sup3rgiu commented 7 months ago

Thank you! Yep, we can improve something in the future, and also add cache support for parallelecheckaf and paralleleinferaf.

Also, I would be very glad if you could update the package on pypi. Thanks again!