sstadick / gzp

Multi-threaded Compression
The Unlicense
155 stars 14 forks source link

Error when writting with ` ZBuilder::<Bgzf, _>::new().num_threads(8)...` #30

Closed mrvollger closed 2 years ago

mrvollger commented 2 years ago

Hello,

I am getting an error whenever I use Zbuilder::<Bgzf, _> with more than one thread and I am having trouble figuring out what is going wrong. Here is the error message I am getting:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ChannelReceive(Disconnected)', /Users/mrvollger/.cargo/registry/src/github.com-1ecc6299db9ec823/gzp-0.9.2/src/par/compress.rs:301:27
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/std/src/panicking.rs:498:5
   1: core::panicking::panic_fmt
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/panicking.rs:107:14
   2: core::result::unwrap_failed
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/result.rs:1690:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/result.rs:1018:23
   4: <gzp::par::compress::ParCompress<F> as core::ops::drop::Drop>::drop
             at /Users/mrvollger/.cargo/registry/src/github.com-1ecc6299db9ec823/gzp-0.9.2/src/par/compress.rs:301:13
   5: core::ptr::drop_in_place<gzp::par::compress::ParCompress<gzp::deflate::Bgzf>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   6: core::ptr::drop_in_place<alloc::boxed::Box<dyn gzp::ZWriter>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   7: core::ptr::drop_in_place<alloc::boxed::Box<dyn std::io::Write>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   8: test::run_split_fastx
             at ./src/test.rs:72:5
   9: test::main
             at ./src/test.rs:77:5
  10: core::ops::function::FnOnce::call_once
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

And here is some code and a test file that recreates this error for me:

use gzp::deflate::Bgzf;
use gzp::Compression;
use gzp::ZBuilder;
use needletail::{parse_fastx_file, parse_fastx_stdin, parser::LineEnding};
use std::ffi::OsStr;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};

const BUFFER_SIZE: usize = 128 * 1024;

/// Uses the presence of a `.gz` extension to decide if compression is needed
pub fn writer(filename: &str) -> Box<dyn Write> {
    let ext = Path::new(filename).extension();
    let path = PathBuf::from(filename);
    let buffer = Box::new(BufWriter::with_capacity(
        BUFFER_SIZE,
        File::create(path).expect("Error: cannot create output file"),
    ));

    if ext == Some(OsStr::new("gz")) {
        let writer = ZBuilder::<Bgzf, _>::new()
            .num_threads(8)
            .compression_level(Compression::new(6))
            .from_writer(buffer);
        Box::new(writer)
    } else {
        buffer
    }
}

/// Split a fasta file across outputs
pub fn run_split_fastx(files: &[String], infile: &str) {
    // open the output files
    let mut outs = Vec::new();
    for f in files {
        let handle = writer(f);
        outs.push(handle);
    }
    // open reader
    let mut reader = if infile == "-" {
        parse_fastx_stdin().expect("Missing or invalid stdin for fastx parser.")
    } else {
        parse_fastx_file(infile).expect("Missing or invalid stdin for fastx parser.")
    };
    // iterate
    let mut out_idx = 0;
    let mut rec_num = 0;
    while let Some(record) = reader.next() {
        let seq_rec =
            record.unwrap_or_else(|_| panic!("Error reading record number {}", rec_num + 1));
        seq_rec
            .write(&mut outs[out_idx], Some(LineEnding::Unix))
            .unwrap_or_else(|_| panic!("Error writing record number {}", rec_num + 1));

        eprintln!("Wrote record number {}", rec_num + 1);
        out_idx += 1;
        rec_num += 1;
        if out_idx == outs.len() {
            out_idx = 0;
        }
    }
    // Close all the files.
    let mut n_out = 0;
    for mut out in outs {
        out.flush()
            .unwrap_or_else(|_| panic!("Error flushing output!"));
        n_out += 1;
        eprintln!("Finished output number {}", n_out);
    }
}

pub fn main() {
    let infile = "large.test.fa.gz";
    run_split_fastx(&["a.fa".to_string(), "b.fa.gz".to_string()], infile);
}

Hete is the fasta file used in this example code: large.test.fa.gz

Any help would be greatly appreciated!

Thanks! Mitchell

sstadick commented 2 years ago

@mrvollger Sorry the delay in getting to this! I'll take a look later tonight or tomorrow. I don't see anything immediately wrong with your example code.

mrvollger commented 2 years ago

Thanks so much for taking a look! I am having a hard time confirming this but I think it might have to do with the length of the sequences. I have run basically this code on short reads and never had a problem, but then I tried some contigs from assemblies and started to get this error.

Thanks again!

sstadick commented 2 years ago

Yep, I have a fix incoming to gzp. It looks like when the write calls have more bytes than the BGZF max buffer size the final call to flush will try to send all the remaining bytes at once, which will then fail in BGZF since the number of bytes exceeds the max buffer size allowed by the BGZF spec.

Thank you for making this issue! I would not have run into this normally I don't think and missed it in all my test cases as well.

sstadick commented 2 years ago

Try the v0.9.3 release

sstadick commented 2 years ago

And thank you again for the issue!

mrvollger commented 2 years ago

Just tried it and it got through the file and wrote all the records! Thanks!

However, I do get this error/warning from samtools when I try to index the resulting file.

$ cat .test/large.test.fa.gz | cargo run --release --bin rb -- -vvv fasta-split 1.fa.gz
$ samtools faidx 1.fa.gz
[W::bgzf_read_block] EOF marker is absent. The input is probably truncated

The index does appear to be created after this, and it is correct in this case, but it seems a little worrying.

sstadick commented 2 years ago

Good catch! The fix to make sure large writes were handled ended up skipping the EOF marker. As you noted it's not technically needed but the HTS suit of tools will complain if it isn't present.

v0.9.5 has the fixes. Thanks for following up!