zaeleus / noodles

Bioinformatics I/O libraries in Rust
MIT License
499 stars 52 forks source link

Multi-threading API clarification #115

Open d-cameron opened 2 years ago

d-cameron commented 2 years ago

I'm attempting to write a small utility that process bam records and I can't figure out the aysnc API. I'm attempting to adapt the noodles-bam/examples/bam_reheader_async.rs example and I've been running into problems with isolating the async code.

A common design pattern for bioinformatics tool is to iterate over one or more files in genomic coordinate order, process the records, then write (typically a subset of) the records to new files. The cost of the processing for many of these programs is small and the bottleneck is I/O and record parsing. For these sorts of programs, HTSJDK/htslib expose an API that allow offloading of the I/O and record parsing to background threads.

Does noodles support a synchronous AM read/write API in which the compression/decompression and serialisation/parsing are off-loaded to background threads? Something along the lines of `builder().set_worker_threads(8).set_buffer_size(8 65536).build();` ?

zaeleus commented 2 years ago

The short answer is no; this is currently not supported.

Sorry for the delayed response. I originally thought this was doable with the current API and wanted to give an example. Unfortunately, while you can do this for reading and parsing now, there is no way to preserialize a record for writing.

Thank you for bringing up this use case. I'll investigate it further.

d-cameron commented 9 months ago

What's the current state of this? Does the API now support multi-threading? Even just multithreading support for reading would be immensely useful.

Thanks

zaeleus commented 9 months ago

With respect to bgzip-compressed formats, you can compose a multithreaded decoder with a format reader. E.g.,

let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
let file = File::open(src)?;
let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, file);
let mut reader = bam::io::Reader::from(decoder);

This can also be done with a BGZF encoder and format writer. I think this is half of what you originally requested, and it does greatly help with compression/decompression performance. The reader, in particular, does have the limitation of not implementing random access though.

But there is still no multithreaded format readers/writers. In a passthrough context, the SAM/BAM readers don't eagerly decode record fields anymore, so I tried to write an example for parallel serialization, which is feasible with the current API. It's a POC and nontrivial, but it does show something is possible.

main.rs ```rust // cargo add crossbeam-channel // cargo add noodles --features bam,bgzf,sam // cargo add noodles-bgzf --features libdeflate use std::{ env, fs::File, io::{self, Write}, mem, num::NonZeroUsize, sync::Arc, thread::{self, JoinHandle}, }; use crossbeam_channel::{Receiver, Sender}; use noodles::{bam, bgzf, sam}; const CHUNK_SIZE: usize = 1 << 12; fn main() -> io::Result<()> { let mut args = env::args().skip(1); let src = args.next().expect("missing src"); let dst = args.next().expect("missing dst"); let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN); let src_file = File::open(src)?; let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, src_file); let mut reader = bam::io::Reader::from(decoder); let header = reader.read_header().map(Arc::new)?; let dst_file = File::create(dst)?; let encoder = bgzf::MultithreadedWriter::with_worker_count(worker_count, dst_file); let mut writer = bam::io::Writer::from(encoder); writer.write_header(&header)?; let mut writer = MultithreadedBamWriter::new(worker_count, header, writer); for result in reader.records() { let record = result?; writer.write_record(record)?; } writer.finish()?; Ok(()) } type BufferedTx = Sender>>; type BufferedRx = Receiver>>; type SerializeTx = Sender<(Vec, BufferedTx)>; type SerializeRx = Receiver<(Vec, BufferedTx)>; type WriteTx = Sender; type WriteRx = Receiver; type RecycleTx = Sender>; type RecycleRx = Receiver>; struct MultithreadedBamWriter { writer_handle: Option>>, serializer_handles: Vec>, chunk: Vec, write_tx: Option, serialize_tx: Option, recycle_tx: Option, } impl MultithreadedBamWriter { fn new( worker_count: NonZeroUsize, header: Arc, inner: bam::io::Writer, ) -> Self { let (write_tx, write_rx) = crossbeam_channel::bounded(worker_count.get()); let (serialize_tx, serialize_rx) = crossbeam_channel::bounded(worker_count.get()); let (recycle_tx, recycle_rx) = crossbeam_channel::bounded(worker_count.get()); for _ in 0..worker_count.get() { recycle_tx.send(Vec::new()).unwrap(); } let writer_handle = spawn_writer(inner, write_rx, recycle_tx.clone()); let serializer_handles = spawn_serializers(worker_count, serialize_rx, recycle_rx, header); Self { writer_handle: Some(writer_handle), serializer_handles, chunk: Vec::with_capacity(CHUNK_SIZE), write_tx: Some(write_tx), serialize_tx: Some(serialize_tx), recycle_tx: Some(recycle_tx), } } fn write_record(&mut self, record: bam::Record) -> io::Result<()> { self.chunk.push(record); if self.chunk.len() >= self.chunk.capacity() { self.flush(); } Ok(()) } fn flush(&mut self) { let records = mem::replace(&mut self.chunk, Vec::with_capacity(CHUNK_SIZE)); let (buffered_tx, buffered_rx) = crossbeam_channel::bounded(1); self.write_tx.as_ref().unwrap().send(buffered_rx).unwrap(); self.serialize_tx .as_ref() .unwrap() .send((records, buffered_tx)) .unwrap(); } fn finish(&mut self) -> io::Result<()> { self.flush(); self.recycle_tx.take(); self.serialize_tx.take(); for handle in self.serializer_handles.drain(..) { handle.join().unwrap(); } self.write_tx.take(); if let Some(handle) = self.writer_handle.take() { handle.join().unwrap()?; } Ok(()) } } fn spawn_writer( mut writer: bam::io::Writer, write_rx: WriteRx, recycle_tx: RecycleTx, ) -> JoinHandle> { thread::spawn(move || { while let Ok(buffered_rx) = write_rx.recv() { if let Ok(result) = buffered_rx.recv() { let buf = result?; writer.get_mut().write_all(&buf[..])?; recycle_tx.send(buf).ok(); } } Ok(()) }) } fn spawn_serializers( worker_count: NonZeroUsize, serialize_rx: SerializeRx, recycle_rx: RecycleRx, header: Arc, ) -> Vec> { (0..worker_count.get()) .map(|_| { let serialize_rx = serialize_rx.clone(); let recycle_rx = recycle_rx.clone(); let header = header.clone(); thread::spawn(move || { while let Ok((records, buffered_tx)) = serialize_rx.recv() { if let Ok(mut dst) = recycle_rx.recv() { let result = serialize(&header, &records, &mut dst).map(|_| dst); buffered_tx.send(result).ok(); } } }) }) .collect() } fn serialize(header: &sam::Header, records: &[bam::Record], dst: &mut Vec) -> io::Result<()> { dst.clear(); let mut serializer = bam::io::Writer::from(dst); for record in records { serializer.write_record(header, record)?; } Ok(()) } ```
ghuls commented 8 months ago

Is it possible to set the compression level when using: bgzf::MultithreadedWriter?