Open jeromekelleher opened 4 years ago
Looking for that "performance" label @hammer...
I'll generate some big plink files and report back.
I've done some simulations and generated a decent sized dataset:
-rw-r--r-- 1 jk jk 45G Jul 17 09:50 1m.bed
-rw-r--r-- 1 jk jk 9.3M Jul 17 09:50 1m.bim
-rw-r--r-- 1 jk jk 6.9M Jul 17 09:50 1m.bim.backup
-rw-r--r-- 1 jk jk 15M Jul 17 09:20 1m.fam
-rw-r--r-- 1 jk jk 476 Jul 17 09:50 1m.log
-rw-r--r-- 1 jk jk 20M Jul 17 09:50 1m.pheno
-rw-r--r-- 1 jk jk 175M Jul 16 17:52 1m.trees
-rw-r--r-- 1 jk jk 708G Jul 17 09:20 1m.vcf
We've got a 45G plink bed file, which is for 500K diploid samples. The total size of the zarr is 40G
, which is reasonably respectable. Here's the details on the genotypes:
>>> z2 = zarr.open("1m.zarr")
>>> z2.call.genotype
<zarr.core.Array '/call/genotype' (379809, 500000, 2) int8>
>>> z2.call.genotype.info
Name : /call/genotype
Type : zarr.core.Array
Data type : int8
Shape : (379809, 500000, 2)
Chunk shape : (4689, 6250, 2)
Order : C
Read-only : False
Compressor : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type : zarr.storage.DirectoryStore
No. bytes : 379809000000 (353.7G)
No. bytes stored : 40699455234 (37.9G)
Storage ratio : 9.3
Chunks initialized : 6480/6480
So, the chunking is good - looks like about 6M per compressed chunk. I think that's going to be a very good size for most applications.
What's less good is this: there's about 13K files in this zarr directory. We are very much not going to make any friends with people using HPC systems if they have to move 13K files around and open/close substantial numbers of those files at a time. There are good options for doing this better in zarr, but I think it's unreasonable for users to need to understand these and have to provide the options to ds.to_zarr()
.
I think the way to go is to provide a command line utility which will package up some of these common requirements as options to the CLI, and provide things like progress monitors etc. I'll open an issue on sgkit-plink to track
The plink conversion worked really, really well, I was very impressed! There's room for improvement in terms of parallelisation though, I think - I'll open an issue to track also.
Thanks for the stress test @jeromekelleher!
I think the way to go is to provide a command line utility which will package up some of these common requirements as options to the CLI, and provide things like progress monitors etc.
While I think we should provide a CLI to sgkit
, I'd like all of this functionality to be available to library users as well. As you are likely aware, https://github.com/tqdm/tqdm makes it quite easy to display a progress bar in an interpreter of Jupyter notebook in addition to the terminal.
Yep, having an API function to do the most of the work is the way to go all right.
That's awesome, I haven't tried anything nearly that big yet so thanks for running it! I'm definitely relieved that it didn't take more elbow grease.
I have yet to find a good way to profile compiled code so assuming some of the suboptimalities are related to pysnptools and not Dask, what would you normally do to try to isolate where bottlenecks are if not in python?
I have yet to find a good way to profile compiled code
I can call in some people who might be able to help us out if we have a workload that's easy to set up and run.
perf, valgrind, and gperftools are some tools that I've seen others use successfully in the past. I have not been involved in systems software projects since the eBPF tracing tools reached maturity but they also look quite fun.
I should also mention that @toddlipcon used the Chromium tracing framework in Kudu. I'm not sure if it has much utility for single process applications, but just figured I'd point it out.
Actually in Kudu we use it only for single process (multi-thread) tracing. There's a new version of this framework that's more standalone that I just came across here: https://perfetto.dev/
If I may add something here, in the face of (python) performance issue I like a top-down approach and apply Occam's razor. So in this case we could for example start with py-spy to easily get quick tracing information (this will likely provides clues), and we could also easily measure the performance without IO to compare the two scenarios. Apart from that already mentioned by @hammer eBPF tools are great (definitely recommended), but I would reach for them as you go down the rabbit hole (when you roughly know what you are looking for).
Edit: py-spy has support for native frames and multiprocessing
which could come in handing for dask
process based cluster (multithreaded based cluster might be affected by GIL, which btw py-spy could also highlight). Tho for GIL specifically I would use gil_load.
Edit 2: btw sysdig is good too, but it might be hard to apply in this scenario.
I think the single threaded performance from pysnptools is probably quite good, and doesn't need any serious profiling at this point. I had a look at perf top
while this was running, and the vast majority of the time was being spent in numpy primitive operations. It looked like Python wasn't getting in the way at all. Beyond that, I think you're chasing small constants, which we definitely don't need to do at this point.
What I found surprising was that it wasn't using enough CPU. I had 40 threads available, but it was only using about 200% CPU. So, it's either an IO issue (we're waiting for storage) or some sort of parallelism issue.
I guess it would be good to separate out the read and write side of this by writing the genotypes into a big array in RAM first, and then writing that zarr. Then we'll have a better idea of where the bottlenecks are.
Great work! This is very promising.
@jeromekelleher was it easy to generate the plink file? Would be good to doc how to do this for a variety of sizes.
@jeromekelleher was it easy to generate the plink file? Would be good to doc how to do this for a variety of sizes.
It's straightforward, but made pretty tedious by having to go through VCF first. The ideal pipeline would be tskit->sgkit->plink, which would mean we're not dependent on any external binaries. We're missing a few pieces of this puzzle first though.
Here's the makefile I've been using:
all: 100k.vcf 100k.bed 1m.vcf 1m.bed
# Prevent deletion of intermediates.
.SECONDARY: all
100k.trees:
stdpopsim HomSap -c chr22 100000 > $@
1m.trees:
stdpopsim HomSap -c chr22 1000000 > $@
%.vcf: %.trees
tskit vcf $< --ploidy=2 > $@
%.bed: %.vcf
plink1.9 --vcf $< --double-id --out $*
@jeromekelleher was it easy to generate the plink file? Would be good to doc how to do this for a variety of sizes.
@jeromekelleher do you know if the PLINK 2.0 --dummy
command can generate larger-than-RAM datasets?
It looks like pysnptools has a .bed writer but it seems to only take in-memory arrays. I haven't tried it yet but maybe it will work if provided a Dask array instead. Might be worth some experimentation or digging around in their source code.
I don't think we should worry about optimising the writing plink-dataset-larger-than-ram case @eric-czech - in practise, people who want to work with large plink datasets using plink will have enough RAM to work with the dataset. I think it's fine to have memory only, especially for an initial version.
What I found surprising was that it wasn't using enough CPU. I had 40 threads available, but it was only using about 200% CPU. So, it's either an IO issue (we're waiting for storage) or some sort of parallelism issue.
Hey @jeromekelleher I was experimenting with this and I'm fairly certain the issue is that the GIL is stopping pysnptools from doing anything in parallel when multiple dask workers are running with the default 'threads' scheduler. I saw this same issue but you can easily get far greater CPU utilization by changing the default scheduler to use the multiprocessing backend (so the workers aren't in the same python process), i.e. dropping this at the top of the script:
import dask
# Set the default scheduler to be used globally on computation of all arrays
dask.config.set(scheduler='processes')
If you get a chance, I'd love to know if that makes the same difference for you as it does for me.
I'm also finding that It's much better to use the 'threads' scheduler to read the data and the 'processes' scheduler to write it. For example:
with dask.config.set(scheduler='threads'):
# Lots of inter-worker communication happens on read, but
# parallelism is low for python code limited by GIL.
ds = read_plink(...)
with dask.config.set(scheduler='processes'):
# Very little inter-worker communication happens on write and the GIL
# isn't an issue on separate processes.
ds.to_zarr(...)
The biggest drawback of the 'processes' scheduler is that inter-worker communication is very expensive. Either way, that setup above would likely make for some solid improvements.
Thanks @eric-czech, I'll try this out when I get a chance. (Catching up on a significant backlog atm)
I've tried this out @eric-czech, and it's not looking like the GIL is the bottleneck here. Using the "processes" scheduler doesn't seem to change much - my server is still running ~95% idle as far as top is concerned (2.5% user, 3% system). There are occasional spikes where CPU usage goes up and all the worker processes are busy, but mostly there seems to be a lot of waiting for going on. I'm fairly sure IO isn't the bottleneck.
Probably looking at the dynamic dask graph would help to understand what's going on here - I'm a dask noob though, so not much ideas on how to dig into it.
Can you share the script you're running?
import sgkit_plink
import sys
import dask.config
from dask.diagnostics import ProgressBar
if __name__ == "__main__":
path = sys.argv[1]
with dask.config.set(scheduler='threads'):
with ProgressBar():
ds = sgkit_plink.read_plink(path=path, bim_sep=" ", fam_sep=" ")
print("Read dataset")
print(ds)
with dask.config.set(scheduler='processes'):
with ProgressBar():
ds.to_zarr(sys.argv[2], mode="w")
print("Done!")
ps. the ProgressBar on the first read_plink
command doesn't work well - it writes out a bunch of extra bars, like you predicted at some point.
Hm I assumed increasing parallelism for the reads would improve the problem on write too but I guess not (sorry should not have assumed that). I tried your script and saw the same. More specifically where I saw improvements was on something like this:
read_plink(...)['call/genotype'].mean(axis=(1,2)).compute()
With scheduler="threads"
:
With scheduler="processes"
:
The former takes about 3x as long. I'm not sure why the zarr writes need to be synchronized, but it looks like that might be the case:
ds = read_plink(...)
dask.config.set(scheduler='processes')
with ProgressBar(), ResourceProfiler() as rprof, Profiler() as prof:
ds.to_zarr('/tmp/ukb_chr21.zarr', mode="w")
visualize([prof, rprof])
That or the lulls could be related to inter-worker communication after data for blocks has been fetched. Maybe some combination of both? I'm not sure.
Super cool, thanks @eric-czech! Hopefully we can tune this behaviour out by taking more control of the zarr store we're writing to. Really useful data though, this is excellent.
A quick update from the bgen side of the world:
I ran a script to convert UKB chromosomes 21 and 22 to zarr using our current bgen-reader wrapper in sgkit-bgen (f80601b ). Each bgen file is about 36.5GB in size and takes ~14 hours to convert on 6 threads per file. I wasn't able to get great resource profiling from it given that it was running on GKE, but the CPU utilization was constantly close to maxed out (for 6 vCPUs of 8):
This is also using the multiprocessing scheduler and I saw a similar lack of CPU utilization without it much like the comments above when testing PLINK reads.
At 14 hours for 36.5G, that's about 2.5G converted per hour and I'd like to compare that directly to the PLINK numbers @jeromekelleher shared in https://github.com/pystatgen/sgkit-plink/issues/6 (45G / 6.5 hrs = ~7G per hr) but the schedulers were different as were the core/thread counts and the fact that I was writing out over the network rather than to local disk. These could be useful points of comparison for other experiments though.
We'd like to see how the existing conversion pipelines work on large scale data. A good start would be to convert a convert a ~100G plink file to zarr. This should give us some information about how things are performing.