zaeleus / noodles

Bioinformatics I/O libraries in Rust
MIT License
484 stars 52 forks source link

Can bgzip multi-threading be combined with random access reads? #139

Open zdk123 opened 1 year ago

zdk123 commented 1 year ago

Is there a general strategy for simultaneous random access to GZ blocks in an indexed gz file?

For large files, i know the virtual positions and buffer sizes i want to access and so when these are totally disjoint, it should be a nice speed-up to read these via multiple threads. Right now, this can't be done via the Indexed Reader, since the virtual position is a mutable property of the reader. However, I notice that contiguous blocks can be read multi-threaded so, in-principle, why not discontiguous blocks?

thanks!

JShermanK1 commented 1 year ago

Using the sync_file crate, you can clone the file handle into a new reader for each thread. Each clone of the sync file will maintain an independent position for "multi-threaded reads". As I understand it, reading the bytes off the disk is still sequential, but the post disc reading decompression can then happen concurrently, which is the majority of the time spent anyways.

use sync_file::SyncFile;
use rayon::prelude::*;
use noodles::bcf;
use itertools::Itertools;

 let f = SyncFile::open(path)?;
 let header = {
        let mut bcf_r = bcf::Reader::new(f.clone());

        bcf_r.read_file_format()?;
        header = bcf_r.read_header()?

    }

let mut data = (0..).map_while(|i|{

        header.contigs().get_index(i)

                        }).collect_vec()
                        .into_par_iter()
                        .for_each(|chrom| {

        let mut bcf_r = { 

            let mut bcf_r = bcf::Reader::new(f.clone());
            bcf_r.read_file_format().expect("failed to read format");
            let _r_header = bcf_r.read_header().expect("failed to read header");

            bcf_r
        };
        let region = format!("{}", chrom.0).parse().expect("failed to parse region");
        let records = bcf_r.query(&header, &index, &region)
                                     .expect("failed to query index");

        records.for_each(|record| {

            do_stuff(record);

        });
});