sgkit-dev / sgkit

Scalable genetics toolkit
https://sgkit-dev.github.io/sgkit
Apache License 2.0
234 stars 32 forks source link

Alternative execution engines for VCF conversion #1130

Open tomwhite opened 1 year ago

tomwhite commented 1 year ago

VCF to Zarr conversion is an embarrassingly parallel process that currently uses Dask Delayed to schedule tasks.

It would be fairly easy to make it possible to run on any parallel backend - examples being Python concurrent.futures, Modal, or Lithops.

tomwhite commented 1 year ago

As a proof of concept I did this for Modal using the following code:

import fsspec
import modal
from sgkit.io.vcf.utils import (
    build_url,
    url_filename,
)
from sgkit.io.vcf.vcf_partition import partition_into_regions
from sgkit.io.vcf.vcf_reader import vcf_to_zarr_sequential, zip_input_and_regions

stub = modal.Stub("vcf-modal")

image = modal.Image.conda().conda_install(["cyvcf2", "sgkit", "s3fs"], channels=["bioconda", "conda-forge"])

@stub.function(image=image, secret=modal.Secret.from_name("my-aws-secret"))
def f(args):
    output_storage_options = {}
    input, region, part_url = args
    print("running on", input, region, part_url)
    output_part = fsspec.get_mapper(part_url, **output_storage_options)
    vcf_to_zarr_sequential(input, output_part, region, chunk_length=1_000, chunk_width=1_000)
    return part_url

def vcf_to_parts(input, output, regions):
    for input, input_region_list in zip_input_and_regions(input, regions):
        filename = url_filename(str(input))
        if input_region_list is None:
            # single partition case: make a list so the loop below works
            input_region_list = [None]  # type: ignore
        for r, region in enumerate(input_region_list):
            part_url = build_url(str(output), f"{filename}/part-{r}.zarr")
            yield input, region, part_url

if __name__ == "__main__":
    input = "s3://sgkit-modal-tom/1000G.phase3.broad.withGenotypes.chr20.10100000.vcf.gz"
    output = "s3://sgkit-modal-tom/1000G.phase3.broad.withGenotypes.chr20.10100000.zarrs"
    target_part_size="4MB"

    print(f"partitioning into regions of size {target_part_size}...")
    regions = partition_into_regions(input, target_part_size=target_part_size)
    print("regions", regions)

    with stub.run():
        for result in f.map(vcf_to_parts(input, output, regions)):
            print(f"Completed {result}")

The general bit is vcf_to_parts, which we could add to the sgkit API, and the rest is Modal-specific. This is quite low-level though, as you have to call partition_into_regions first. Also, it produces a bunch of Zarr files that need to be combined with concat_zarrs - and that function would need generalising to use Modal too (or another engine).

Stepping back a bit, this would be a nice way of showing how to do VCF conversion quickly for VCF files that folks have in a cloud store using Modal. It would also be applicable for batch processing on HPC.

tomwhite commented 1 year ago

Note that concat_zarrs could be avoided completely if we had #1131

hammer commented 1 year ago

Nice! I can ask Erik if he would consider hosting this on Modal as an example

tomwhite commented 1 year ago

That would be great! The code needs a bit more attention before I'd be happy for people to copy it.

hammer commented 9 months ago

@jeromekelleher has a manually built alternative execution engine for VCFs at https://github.com/pystatgen/sgkit-publication/pull/94