Closed alimanfoo closed 1 year ago
Following up the suggestion to run a pairwise distance computation on the Ag1000G phase 2 data, with some more details on the input data.
The data are available in scikit-allel-style zarrs in GCS. The full set of SNP calls and genotypes is at this path: gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1
. It can be opened e.g.:
import fsspec
import zarr
store = fsspec.get_mapper('gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1')
callset_snps = zarr.open_consolidated(store=store)
These data are grouped by chromosome arm. A demonstration for just a single chromosome arm would be sufficient. 2R is the largest, and so has the most SNPs. Alternatively whole genome is also interesting if equally convenient to compute. In case it helps, here are the genotype calls for 2R:
gt = callset_snps['2R/calldata/GT']
One route to bring these data into sgkit is via the open_vcfzarr function. Another route may be to use the data that @tomwhite has previously converted to the sgkit zarr representation. Either would be fine.
@aktech I'll follow up by email to get you access to our jupyterhub service on Google Cloud so you can try this on a dask kubernetes cluster if useful.
cc @jeromekelleher
E.g., being able to say that we can run a PCA on UK biobank-sized data using a N node cluster in Google Cloud and compute the result in M minutes.
FYI @alimanfoo this notebook runs PCA on the UKB PLINK data using a 50 node 8 vCPU cluster (n1-standard-8). It takes ~30 seconds to run on a 108,621 variant x 141,070 sample (~60GB) alt allele count matrix. I chose that size based on what UKB did for their own PCA calculations. For comparison, running an axis-wise reduce function takes about 3 seconds (e.g. get the call rate for all variants).
I also tried this on a 20 node cluster and saw the same PCA step take 50 seconds, so the operations don't scale perfectly -- the 50 node cluster should have only taken 20 seconds otherwise.
I originally tried this with tsqr and non-random PCA but that kept blowing past memory limits, even on "highmem" instances. I suspect something in tsqr produces n_samples x n_samples results for chunks that include all samples but relatively tiny numbers of variants. I think it's an awkward fit for arrays this shape regardless so I plan on getting comfortable with using randomized PCA (as in the notebook).
One route to bring these data into sgkit is via the open_vcfzarr function. Another route may be to use the data that @tomwhite has previously converted to the sgkit zarr representation. Either would be fine.
@tomwhite Can you share, where can I get this? or would you suggest I use open_vcfzarr
function?
@aktech I think the simplest thing for the moment is to use read_vcfzarr
on a single contig (e.g. 2R
) - that way you'll avoid all the rechunking issues that we're still trying to get resolved. (Rechunking is only needed when we are combining contigs - for a single contig no rechunking is necessary.)
Note that read_vcfzarr
doesn't support cloud stores yet, so you'll need to copy the contents of gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1/2R
to the local filesystem (or change it to read using fsspec
).
Oh, alright. Thanks @tomwhite I'll give it a go.
BTW the sgkit notebooks I'm working on are here: https://github.com/tomwhite/shiny-train/tree/sgkit/notebooks/gwss. (Very much WIP.)
Note that read_vcfzarr doesn't support cloud stores yet, so you'll need to copy the contents of gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1/2R to the local filesystem (or change it to read using fsspec).
The contents of gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1/2R
is 57.58 GiB, which I cannot copy in the datalab as it only has less than 10 GiB of storage.
or change it to read using fsspec
By this did you mean modifying the function read_vcfzarr
to read using fsspec
?
it only has less than 10 GiB of storage
Is it possible to increase that?
By this did you mean modifying the function
read_vcfzarr
to read usingfsspec
?
Yes, since the input is zarr you should be able to call zarr. open_group
with a MutableMapping
obtained from fsspec
. We already do something similar here, for example: https://github.com/pystatgen/sgkit/blob/master/sgkit/io/vcf/vcf_reader.py#L264
Is it possible to increase that?
I am not sure, its malariagen's datalab.
I am now doing this on my cluster, where I have enough disk space. I have pulled the contents of gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1/2R
on the server now. I am getting the following error, while reading the data through read_vcfzarr
read_vcfzarr(path)
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-23-b318af759c7a> in <module>
----> 1 read_vcfzarr(path)
~/sgkit/sgkit/io/vcfzarr_reader.py in read_vcfzarr(path)
44
45 # Index the contig names
---> 46 variants_chrom = da.from_zarr(vcfzarr["variants/CHROM"]).astype(str)
47 variant_contig, variant_contig_names = encode_array(variants_chrom.compute())
48 variant_contig = variant_contig.astype("int16")
~/.local/lib/python3.7/site-packages/zarr/hierarchy.py in __getitem__(self, item)
347 synchronizer=self._synchronizer)
348 else:
--> 349 raise KeyError(item)
350
351 def __setitem__(self, item, value):
KeyError: 'variants/CHROM'
Also, after I get through the error above, I am not sure which part of the xarray dataset I need to use for pairwise distance calculation, as in the pairwise distance needs a 2D matrix, is there a documentation somewhere which I can refer to for the same?
Also, after I get through the error above, I am not sure which part of the xarray dataset I need to use for pairwise distance calculation, as in the pairwise distance needs a 2D matrix, is there a documentation somewhere which I can refer to for the same?
Was there an example in one of @alimanfoo's notebooks for this? Maybe in the pairwise distance blogpost?
OK. Well, we'll be comparing the genotypes anyway - I'm not sure exactly how Alistair uses pairwise distance between genotypes, but I don't think it'll matter too much for a simple benchmark.
So, basically we're getting the distance between pairs of samples by comparing their genotype arrays.
@aktech try using #324 to read the data.
So, basically we're getting the distance between pairs of samples by comparing their genotype arrays.
Alright, let me check that.
@aktech try using #324 to read the data.
Thanks, let me have a look.
@tomwhite I am getting the same error, while trying to read the data: read_vcfzarr("shared/sgkit_data/2R")
, am I missing something here?
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-5-7e129b306df9> in <module>
----> 1 read_vcfzarr("shared/sgkit_data/2R")
~/sgkit/sgkit/io/vcfzarr_reader.py in read_vcfzarr(path)
42
43 # don't fix strings since it requires a pass over the whole dataset
---> 44 return _vcfzarr_to_dataset(vcfzarr, fix_strings=False)
45
46
~/sgkit/sgkit/io/vcfzarr_reader.py in _vcfzarr_to_dataset(vcfzarr, contig, variant_contig_names, fix_strings)
168 if contig is None:
169 # Get the contigs from variants/CHROM
--> 170 variants_chrom = da.from_zarr(vcfzarr["variants/CHROM"]).astype(str)
171 variant_contig, variant_contig_names = encode_array(variants_chrom.compute())
172 variant_contig = variant_contig.astype("int16")
~/.local/lib/python3.7/site-packages/zarr/hierarchy.py in __getitem__(self, item)
347 synchronizer=self._synchronizer)
348 else:
--> 349 raise KeyError(item)
350
351 def __setitem__(self, item, value):
KeyError: 'variants/CHROM'
Off the top of my head, you need to set grouped_by_contig
to True
, and also consolidated
as well. You might also want to specify contigs
to ["2R"]
. Note that you need to read "shared/sgkit_data" since the code will find the contigs relative to that path.
I guess you mean to use vcfzarr_to_zarr
function as these params are not available in the read_vcfzarr
function. It throws KeyError
in this case.
The path is: path = "shared/sgkit_data/"
(I tried with path = "shared/sgkit_data/2R"
as well and got the same error)
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-46-8adb2c67c900> in <module>
----> 1 vcfzarr_to_zarr(path, "output/", contigs=["2R"], grouped_by_contig=True, consolidated=True)
~/sgkit/sgkit/io/vcfzarr_reader.py in vcfzarr_to_zarr(input, output, contigs, grouped_by_contig, consolidated, tempdir)
74
75 if consolidated:
---> 76 vcfzarr = zarr.open_consolidated(str(input), mode="r")
77 else:
78 vcfzarr = zarr.open_group(str(input), mode="r")
~/.local/lib/python3.7/site-packages/zarr/convenience.py in open_consolidated(store, metadata_key, mode, **kwargs)
1172
1173 # setup metadata sotre
-> 1174 meta_store = ConsolidatedMetadataStore(store, metadata_key=metadata_key)
1175
1176 # pass through
~/.local/lib/python3.7/site-packages/zarr/storage.py in __init__(self, store, metadata_key)
2672
2673 # retrieve consolidated metadata
-> 2674 meta = json_loads(store[metadata_key])
2675
2676 # check format of consolidated metadata
~/.local/lib/python3.7/site-packages/zarr/storage.py in __getitem__(self, key)
824 return self._fromfile(filepath)
825 else:
--> 826 raise KeyError(key)
827
828 def __setitem__(self, key, value):
KeyError: '.zmetadata'
The error seems to be coming from here:
zarr.open_consolidated("shared/sgkit_data/", mode="r")
Did you copy the .zmetadata
file (and any other hidden files from the top-level directory) locally?
BTW, if this doesn't work you can also load the genotype data using the approach Alistair outlines in the second comment of this issue.
Did you copy the .zmetadata file (and any other hidden files from the top-level directory) locally?
Oh I see, didn't realise that's required.
BTW, if this doesn't work you can also load the genotype data using the approach Alistair outlines in the second comment of this issue.
Ah, that's good to know, I thought this step is necessary. I am going to use this method, thanks!
By the way I am using the following piece of code to run the scalability test:
import logging
from sgkit.distance.api import pairwise_distance
import dask.array as da
from dask.distributed import Client
from bokeh.io import output_notebook
import numpy as np
import dask
import zarr
output_notebook()
def setup_logging():
"""Sets up general and dask logger"""
logging_format = "%(asctime)s %(levelname)9s %(lineno)4s %(module)s: %(message)s"
logging.basicConfig(level=logging.INFO, format=logging_format)
logging.info("Logging initialised")
def create_client():
logging.info("Setting dask config")
dask.config.set({'temporary_directory': '/work/aktech/tmp'})
logging.info("Creating dask client")
client = Client()
logging.info(f"Client created: {client}")
def get_data():
logging.info("Getting store object")
sgkit_data = zarr.open_group('/work/aktech/sgkit_data/output/')
gt = sgkit_data['call_genotype']
# Roughly 100 Mb chunks and in the multiples of the current chunk size
gt_da = da.from_zarr(gt, chunks=(524288, 183, -1))
x = gt_da[:, :, 1].T
return x
def main():
setup_logging()
x = get_data()
logging.info(f'The x is: {x}')
logging.info("Starting the pairwise calculation")
out = pairwise_distance(x)
logging.info("Pairwise process complete")
np.save('output.csv', out)
logging.info("All done!")
if __name__ == '__main__':
main()
I was reading from GCP bucket directly but it had some ssl errors for a bit (It is working now though), so I shifted to the in disk approach as shown above.
I am running this on a machine with 48 cores and 64 GB Memory, and I am getting some memory issues. In particular the following:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.66 GB -- Worker memory limit: 7.00 GB
There is a potential memory leak somewhere, I don't know what. I am trying to debug that at the moment. @eric-czech Do you have any ideas on what could potentially be leaking memory?
How many workers do you have? If you have 48 (one per core), then the worker memory limit of 7GB will exceed system memory. Perhaps you could keep reducing the number of workers, giving each more memory, to see if that helps.
The input is chunked along the variants dimension (axis 0), then transposed (so it becomes axis 1), then passed to pairwise_distance
. But doesn't pairwise_distance
expect axis 1 not to be chunked? (I may be missing something here...)
(BTW you've loaded the data now, but here's a notebook for posterity to load the data into sgkit format: https://nbviewer.jupyter.org/github/tomwhite/shiny-train/blob/sgkit/notebooks/gwss/sgkit_import.ipynb)
I just saw Eric's comment here, saying that whole row blocks need to fit in memory for this implementation. So, because of the transpose, that means whole columns of the input (variants) need to fit in memory. If there are 50 million rows, then that's ~100 column blocks, so ~100 x 100MB = 10GB of memory.
One option would be to rechunk to make the sample chunks size smaller (183 -> 18 say). Or try on a machine with fewer CPUs and more memory.
How many workers do you have?
Although the above error is for 8 workers/8 threads, I tried with 4 workers/4threads too, with same result.
If you have 48 (one per core), then the worker memory limit of 7GB will exceed system memory. Perhaps you could keep reducing the number of workers, giving each more memory, to see if that helps.
I thought so, but it seems the memory usage is stacking irrespective of number of workers. I also tried with less number of worker and also less threads to give more memory to each worker, but didn't help.
The input is chunked along the variants dimension (axis 0), then transposed (so it becomes axis 1), then passed to pairwise_distance. But doesn't pairwise_distance expect axis 1 not to be chunked? (I may be missing something here...)
That's true, axis=1 not chunked is faster and yes this is the expectation. The chunking you see in the code above is the result of one of the various things I was trying to make dask happy, I initially started with chunking only on axis=0, then I was getting PerformanceWarning: Increasing number of chunks by factor of "x"
, although my chunks sizes were reasonable as in like 100 Mb (or less sometimes). Also, no matter what chunking approach I tried the memory usage continued to stack up, that's why I was wondering there maybe some kind of memory leak somewhere, (although I might be wrong).
I just saw Eric's comment here, saying that whole row blocks need to fit in memory for this implementation. So, because of the transpose, that means whole columns of the input (variants) need to fit in memory. If there are 50 million rows, then that's ~100 column blocks, so ~100 x 100MB = 10GB of memory.
axis=1
(with same issues):One option would be to rechunk to make the sample chunks size smaller (183 -> 18 say).
I tried this too, the peak memory usage goes to 50 GB and then the worker restarts and the compute fails eventually.
Or try on a machine with fewer CPUs and more memory.
I guess so, one thing I don't understand is when I use the chunking mentioned in "2.", then also the peak memory is beyond the limits, which doesn't makes sense as the chunk size is only 74.30 MB
, no matter how many chunks you have in memory, it shouldn't be enough to cause memory issues, as they are independent and they can be processed independently without depending on any other chunks.
Another suspect I have is the following error:
/home/aktech/sgkit/sgkit/distance/api.py:91: PerformanceWarning: Increasing number of chunks by factor of 381
x_distance = da.blockwise(
I guess, if I have chunks size which dask is not happy about, it might rechunk them (along axis=1), as the message above says causing too many chunks in memory.
@aktech it might be worth using (Resource)Profiler (doc), and maybe also visualize the graph just to validate the assumptions about the data orchestration (doc).
Also for you 2nd option (no chunking along axis=1), you could also try to run it with just a single process (either using local scheduler or tune "distributed" one), to make sure that all threads can use the same memory, and for example not "duplicate" objects by "sharing" chunks etc (which could explain the extra memory usage).
Thanks for all the details @aktech.
We know rechunking in Dask can cause unpredictable memory usage, so I would strongly suggest that you use rechunker to rechunk the data on disk, then run the distance computations separately. Otherwise it's not clear if the memory issues are coming from rechunking, or pairwise distance, (or both!).
The notebook here shows how to user rechunker on Zarr groups: https://nbviewer.jupyter.org/github/tomwhite/shiny-train/blob/sgkit/notebooks/gwss/sgkit_import.ipynb (near the bottom).
As a general strategy I think it would be useful to progressively halve the large dimension (the one that is ~20 million) until you get a computation that succeeds, then from that known good point you can try different worker settings etc, to get a better idea of how the computation scales.
@ravwojdyla
@aktech it might be worth using (Resource)Profiler (doc), and maybe also visualize the graph just to validate the assumptions about the data orchestration (doc).
Yeah, worth doing, I was using in smaller dataset but this one never finished so couldn't see that. I have been using dask diagnostics dashboard so far,
Also for you 2nd option (no chunking along axis=1), you could also try to run it with just a single process (either using local scheduler or tune "distributed" one), to make sure that all threads can use the same memory, and for example not "duplicate" objects by "sharing" chunks etc (which could explain the extra memory usage
Sure, I can give this a go.
@tomwhite
We know rechunking in Dask can cause unpredictable memory usage, so I would strongly suggest that you use rechunker to rechunk the data on disk, then run the distance computations separately. Otherwise it's not clear if the memory issues are coming from rechunking, or pairwise distance, (or both!).
The notebook here shows how to user rechunker on Zarr groups: https://nbviewer.jupyter.org/github/tomwhite/shiny-train/blob/sgkit/notebooks/gwss/sgkit_import.ipynb (near the bottom).
Thanks for the pointer, I'll try this in a bit.
As a general strategy I think it would be useful to progressively halve the large dimension (the one that is ~20 million) until you get a computation that succeeds, then from that known good point you can try different worker settings etc, to get a better idea of how the computation scales.
True, the smallest data I had success with was roughly 20% (x[:200]
) of the above last week. I will try the approach on multiples of the same.
We know rechunking in Dask can cause unpredictable memory usage, so I would strongly suggest that you use rechunker to rechunk the data on disk, then run the distance computations separately. Otherwise it's not clear if the memory issues are coming from rechunking, or pairwise distance, (or both!).
Also, another thing to note, which I forgot to mention. I recall the chunksize you see in the code above was taken to address the problem of rechunking as well, since its (524288, 183, -1)
, which is a multiple of current chunk and also that didn't gave me the performance warning (PerformanceWarning: Increasing number of chunks by factor of x
) I mentioned above, but had the memory issues as mentioned in the first comment.
With chunksize which doesn't chunks on axis=1, I always get that warning (irrespective of the chunking on disk, I just tried chunking on disk and it gave the warning: PerformanceWarning: Increasing number of chunks by factor of 571 x_distance = da.blockwise(
while starting the pairwise calculation although the distance calculation hasn't started yet, it takes a while to kick off for these kind of chunk size (where no chunking is done on axis=1).
I'll try on single process soon.
Possible useful data point to demonstrate performance: I microbenchmarked the hash_array
function used for Garud H statistics, and it was 5x faster than the Python version.
Closing this as it's quite old now
It would be very useful to have some simple demonstrations that using sgkit and the underlying stack we can run some computations over large input datasets, showing we are making real steps towards the goal of interactive analysis of biobank-scale datasets, given a sufficiently powerful HPC/cloud/GPU cluster.
The main purpose of this is to be able to give some highlights in a short report on what has been achieved from the sgkit development work to date.
By "demonstration" I mean just a simple benchmark result.
E.g., being able to say that we can run a PCA on UK biobank-sized data using a N node cluster in Google Cloud and compute the result in M minutes.
E.g., being able to say we can run a pairwise distance computation on the MalariaGEN Ag1000G phase 2 data and compute the result in M minutes on a N node CPU cluster in Google Cloud. Even better would be being able to say we can also run the same computation in M' minutes on a N' node GPU cluster.
I.e., I just need a couple of data points here showing proof of concept, I don't need any detailed performance or scalability benchmarking.
Raising this issue to share suggestions for computations/datasets to use, and post back any results.