onecodex / needletail

Fast FASTX parsing and k-mer methods in Rust
MIT License
174 stars 20 forks source link

I am looking for a function that gives me an iterator over the read SequenceRecord's. #70

Open stela2502 opened 1 year ago

stela2502 commented 1 year ago

Thank you for this package! This is really helpful for me and also especial thanks for giving me the initial help to start with Rust.

I now try to use the par_bridge() functionality in Rust to process two fastq entries in a muti-processor way and not load all data into memory first. ChatGPT sent me down this rabbit hole:

fn main() {
    let file1 = File::open("file1.fastq").expect("Failed to open file1");
    let file2 = File::open("file2.fastq").expect("Failed to open file2");

    let reader1 = BufReader::new(file1);
    let reader2 = BufReader::new(file2);

    // Process the FASTQ pairs in parallel
    reader1
        .lines()
        .zip(reader2.lines())
        .filter_map(|(line1, line2)| {
            if let (Ok(read1), Ok(read2)) = (line1, line2) {
                additional_processing(&read1, &read2)
            } else {
                None
            }
        })
        .par_bridge()
        .for_each(|(line1, line2)| {
            process_fastq_pair(&line1, &line2);
        });
}

Of cause not using the needletail logics to parse through a fastq file. After creating a somewhat promising version of this kind of logics I get this error:


305 | /         reader1.next()
306 | |         .zip(reader2.next())
307 | |         .filter_map(|(line1, line2)| {
    | |         -^^^^^^^^^^ `Option<(Result<SequenceRecord<'_>, ParseError>, Result<SequenceRecord<'_>, ParseError>)>` is not an iterator

I have created the readers like that:

let mut reader1 = parse_fastx_file(&f1).expect("valid path/file");
let mut reader2 = parse_fastx_file(&f2).expect("valid path/file");

Is there something in your library that could give me the result I need for that. I assume it wants an iterator.

THANK YOU FOR YOUR HELP SO FAR!

Euphrasiologist commented 1 year ago

I'm not an author of the code here, but I dug around a bit, and the API does not ever create an iterator. There's a next() method on the FastxReader trait, but this is not an iterator. It mimics one though (and for most purposes it's the same thing). So I'm not sure how you could parallelise this easily.

One could set up an iterator API like this (it's a sketch I just made up):

use needletail::{errors::ParseError, parser::FastaReader};

struct FastaIter<'a, R: std::io::Read> {
    inner: FastaReader<R>,
    phantom: std::marker::PhantomData<&'a R>,
}

struct IntoFastaIter<'a, R: std::io::Read> {
    fastaiter: FastaIter<'a, R>,
    index: usize,
}

impl<'a, R> IntoIterator for FastaIter<'a, R>
where
    R: std::io::Read,
{
    type Item = Result<SequenceRecord<'a>, ParseError>;

    type IntoIter = IntoFastaIter<'a, R>;

    fn into_iter(self) -> Self::IntoIter {
        IntoFastaIter {
            fastaiter: self,
            index: 0,
        }
    }
}

impl<'a, R> Iterator for IntoFastaIter<'a, R>
where
    R: std::io::Read,
{
    type Item = Result<SequenceRecord<'a>, ParseError>;

    fn next(&mut self) -> Option<Self::Item> {
        let inner = self.fastaiter.inner;
        if inner.finished {
            return None;
        }

        // Load some data in the buffer to start
        if inner.position.line == 0 {
            match fill_buf(&mut inner.buf_reader) {
                Ok(n) => {
                    if n == 0 {
                        inner.finished = true;
                        return None;
                    }
                }
                Err(e) => {
                    return Some(Err(e.into()));
                }
            };

            if inner.get_buf()[0] == b'>' {
                inner.position.line = 1;
                inner.position.byte = 0;
                inner.buf_pos.start = 0;
                inner.search_pos = 1;
            } else {
                return Some(Err(ParseError::new_invalid_start(
                    inner.get_buf()[0],
                    ErrorPosition {
                        line: inner.position.line,
                        id: None,
                    },
                    Format::Fasta,
                )));
            }
        }

        if !inner.buf_pos.is_new() {
            inner.next_pos();
        }

        // Can we identify the start of the next record ?
        let complete = inner.find();

        if !complete {
            // Did we get a record?
            let got_record = match inner.next_complete() {
                Ok(f) => f,
                Err(e) => {
                    return Some(Err(e));
                }
            };

            if !got_record {
                return None;
            }
        }

        if inner.buf_pos.seq_pos.is_empty() {
            return Some(Err(ParseError::new_unexpected_end(
                ErrorPosition {
                    line: inner.position.line,
                    id: None,
                },
                Format::Fasta,
            )));
        }

        if inner.line_ending.is_none() {
            inner.line_ending = inner.buf_pos.find_line_ending(inner.get_buf());
        }
        Some(Ok(SequenceRecord::new_fasta(
            inner.get_buf(),
            &inner.buf_pos,
            &inner.position,
            inner.line_ending,
        )))
    }
}

But I need access to private members of the struct, so this would have to be implemented in needletail itself, which I haven't had time to do yet.

Keats commented 1 year ago

Hey there,

It's been a while since I've looked at that code but I don't think we can impl Iterator due to the parser behaviour: we are not allocating anything and borrowing from the internal buffer. I'd be happy to be proven wrong though if it's possible!