bovee / entab

* -> TSV
MIT License
21 stars 5 forks source link

Multithreaded support #23

Open bovee opened 2 years ago

bovee commented 2 years ago

I wrote some code that could take the unconsumed part of a ReadBuffer and allow iterating over it (and then repeating those two steps over and over again) that allowed very basic multithreaded support, but the API was pretty gross and I'm not sure the way the multithreading worked was actually that efficient (it was about 4x slower than the normal Readers). It would be nice to support this in a more principled way.

    #[cfg(feature = "std")]
    #[test]
    fn test_multithreaded_read() -> Result<(), EtError> {
        let f = File::open("./tests/data/test.fastq")?;
        let (mut rb, mut state) = init_state::<FastqState, _, _>(f, None)?;
        let seq_len = Arc::new(AtomicUsize::new(0));
        while let Some((slice, mut chunk)) = rb.next_chunk()? {
            let chunk = rayon::scope(|s| {
                while let Some(FastqRecord { sequence, .. }) =
                    chunk.next(slice, &mut state).map_err(|e| e.to_string())?
                {
                    let sl = seq_len.clone();
                    s.spawn(move |_| {
                        let _ = sl.fetch_add(sequence.len(), Ordering::Relaxed);
                    });
                }
                Ok::<_, String>(chunk)
            })?;
            rb.update_from_chunk(chunk);
        }
        assert_eq!(seq_len.load(Ordering::Relaxed), 250000);

        Ok(())
    }
bovee commented 2 years ago

seq_io has a pretty well-developed "parallel" processing module that might be worth looking at for inspiration: https://github.com/markschl/seq_io/blob/master/src/parallel.rs