Closed Hoeze closed 5 months ago
I do think that this problem can be solved very efficiently with Delta Lake streaming, however it requires significant development effort to develop a formal merging algorithm. The issue you are describing is the n+1 problem in genomics
For now the best practice to merge samples is to either impute variants (for array data) or to use the GATK's genotypeGVCF algorithm (for exome / whole genome data)
@williambrandler I do not mean the N+1 problem here.
I have ~6000 single-sample VCF files and I'd like to merge them into VCFs per chromosome and batch of 1000 samples. Therefore I would need to: for batch in batches: 1) select samples in batch 2) explode genotypes 3) outer join to manually fill missing genotypes 4) sort(variant), groupby(variant), collect genotypes to list 5) write to vcf
This task would be massively simplified if Glow would automatically fill missing genotypes, depending on present sampleId
s in the partition. In fact, the bigvcf
write does exactly that, but on the whole dataset instead of per partition:
For the big VCF writer, the inferred sample IDs are the distinct set of all sample IDs from the DataFrame.
However, writing to bigvcf
does not scale very well for large numbers of samples.
This is a tricky problem
Ideally it requires the use of a formal joint-genotyping algorithm for a process like this (such as genotypeGVCF by the broad institute if you are working with whole genome data)
check out this blog and docs for how genotypeGVCF algorithm was parallelized with Apache Spark https://databricks.com/blog/2019/06/19/accurately-building-genomic-cohorts-at-scale-with-delta-lake-and-spark-sql.html
Or you can run it on a single node, but just a warning, it takes about 6 weeks to process 6k genomes on a single node.
If you do not have access to gVCFs (only VCF), you have to reprocess from the BAM files to gVCF.
The final resort would be to use merging logic similar to what you have described https://glow.readthedocs.io/en/latest/etl/merge.html#joint-genotyping
Also yes at that scale it does no longer make sense to work with flat files such as VCF, keep the pipeline in parquet/delta until you have results that can be exported (summary level stats, GWAS results)
@Hoeze Unfortunately Spark's datasource API doesn't really allow us to run a query over the dataset or a partition before a write AFAIK, and I don't want to add a new requirement that each partition fit in memory for the sharded writer. However, I could provide you with an option to manually specify the sample ids. Technically you can already do that by passing in a full VCF header, but I understand that's a little cumbersome. Would this meet your needs? https://github.com/projectglow/glow/pull/650
The second unit test demonstrates how to get all the sample ids. If you understand your use case correctly, you could repeat the write per sample batch.
@henrydavidge Yes, this would be a really helpful addition!
Great. I will get the PR merged and documented.
From the docs:
Therefore, the sharded VCF writer cannot be used when merging VCF files as it fails because of non-matching sample IDs. Would it be possible to change the behavior by taking the sorted union of all samples in the partition instead?
Another cool feature would be to be able to specify a mapping of
sampleID, sample_partition
:Resulting folder structure: