zaeleus / noodles

Bioinformatics I/O libraries in Rust
MIT License
477 stars 53 forks source link

Help request: using the noodles async FASTQ writer with async compression without corrupting the output #275

Closed nrminor closed 1 month ago

nrminor commented 1 month ago

Hi all,

I'm relatively new to async Rust, but I noticed noodles has async readers and writers and figured I'd give them a shot for one of my projects. I'd like to asynchronously read compressed FASTQ records, mutate them, and write them to a compressed output. So far, I have everything working in a proof-of-concept except for the compression part.

Here's the code that works but does not compress:

use std::path::PathBuf;

use async_compression::tokio::bufread::GzipDecoder;
use color_eyre::eyre::Result;
use futures::TryStreamExt;
use noodles::fastq::r#async::io::Reader as FastqReader;
use noodles::fastq::r#async::io::Writer as FastqWriter;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::io::{BufReader, BufWriter};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<()> {
    // Input and output file paths
    let input_file_path = PathBuf::from("test.fastq.gz");
    let output_file_path = "output.fastq.gz";

    // Open the input file
    let input_file = File::open(&input_file_path).await?;
    let reader = BufReader::new(input_file);
    let decoder = GzipDecoder::new(reader);
    let decode_reader = BufReader::new(decoder);
    let mut fastq = FastqReader::new(decode_reader);
    let mut records = fastq.records();

    // Open the output file
    let output_file = File::create(output_file_path).await?;
    let writer = BufWriter::new(output_file);
    let mut fastq_writer = FastqWriter::new(writer);

    while let Some(mut record) = records.try_next().await? {
        *record.sequence_mut() = b"ATCG".to_vec();
        *record.quality_scores_mut() = b"NDLS".to_vec();
        fastq_writer.write_record(&record).await?;
    }

    let mut final_contents = fastq_writer.into_inner();
    final_contents.flush().await?;

    Ok(())
}

When I slot in the encoder in the writing block of code as below (everything else is the same as above), I get a corrupted .fastq.gz file:

// modified writer block
let output_file = File::create(output_file_path).await?;
let writer = BufWriter::new(output_file);
let encoder = GzipEncoder::new(writer);
let mut fastq_writer = FastqWriter::new(encoder);

I'll attach the test fastq I'm using along with my project manifest in a comment in case anyone would like to test the code themselves.

Is this something anyone here has experience doing? Perhaps flush is acting on the encoder without acting on the underlying bufwriter? Any advice or suggestions would be much appreciated! And thanks for the excellent crate!

--Nick

nrminor commented 1 month ago

test.fastq.gz

And here's my Cargo.toml:

[package]
name = "lazy-amplicon-tk"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
color-eyre = "0.6.3"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["io-util"] }
tracing = "0.1.26"
futures = "0.3"
sccache = "0.8.1"
tracing-subscriber = "0.2.19"
noodles = { version = "0.77.0", features = ["fastq", "async"] }
async-compression = { version = "0.4.11", features = ["all"] }