Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when writting with ZBuilder::<Bgzf, _>::new().num_threads(8)... #30

Closed
mrvollger opened this issue Jan 15, 2022 · 7 comments · Fixed by #31
Closed

Error when writting with ZBuilder::<Bgzf, _>::new().num_threads(8)... #30

mrvollger opened this issue Jan 15, 2022 · 7 comments · Fixed by #31

Comments

@mrvollger
Copy link

Hello,

I am getting an error whenever I use Zbuilder::<Bgzf, _> with more than one thread and I am having trouble figuring out what is going wrong. Here is the error message I am getting:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ChannelReceive(Disconnected)', /Users/mrvollger/.cargo/registry/src/github.com-1ecc6299db9ec823/gzp-0.9.2/src/par/compress.rs:301:27
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/std/src/panicking.rs:498:5
   1: core::panicking::panic_fmt
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/panicking.rs:107:14
   2: core::result::unwrap_failed
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/result.rs:1690:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/result.rs:1018:23
   4: <gzp::par::compress::ParCompress<F> as core::ops::drop::Drop>::drop
             at /Users/mrvollger/.cargo/registry/src/github.com-1ecc6299db9ec823/gzp-0.9.2/src/par/compress.rs:301:13
   5: core::ptr::drop_in_place<gzp::par::compress::ParCompress<gzp::deflate::Bgzf>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   6: core::ptr::drop_in_place<alloc::boxed::Box<dyn gzp::ZWriter>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   7: core::ptr::drop_in_place<alloc::boxed::Box<dyn std::io::Write>>
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ptr/mod.rs:188:1
   8: test::run_split_fastx
             at ./src/test.rs:72:5
   9: test::main
             at ./src/test.rs:77:5
  10: core::ops::function::FnOnce::call_once
             at /rustc/a7e2e33960e95d2eb1a2a2aeec169dba5f73de05/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

And here is some code and a test file that recreates this error for me:

use gzp::deflate::Bgzf;
use gzp::Compression;
use gzp::ZBuilder;
use needletail::{parse_fastx_file, parse_fastx_stdin, parser::LineEnding};
use std::ffi::OsStr;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};

const BUFFER_SIZE: usize = 128 * 1024;

/// Uses the presence of a `.gz` extension to decide if compression is needed
pub fn writer(filename: &str) -> Box<dyn Write> {
    let ext = Path::new(filename).extension();
    let path = PathBuf::from(filename);
    let buffer = Box::new(BufWriter::with_capacity(
        BUFFER_SIZE,
        File::create(path).expect("Error: cannot create output file"),
    ));

    if ext == Some(OsStr::new("gz")) {
        let writer = ZBuilder::<Bgzf, _>::new()
            .num_threads(8)
            .compression_level(Compression::new(6))
            .from_writer(buffer);
        Box::new(writer)
    } else {
        buffer
    }
}

/// Split a fasta file across outputs
pub fn run_split_fastx(files: &[String], infile: &str) {
    // open the output files
    let mut outs = Vec::new();
    for f in files {
        let handle = writer(f);
        outs.push(handle);
    }
    // open reader
    let mut reader = if infile == "-" {
        parse_fastx_stdin().expect("Missing or invalid stdin for fastx parser.")
    } else {
        parse_fastx_file(infile).expect("Missing or invalid stdin for fastx parser.")
    };
    // iterate
    let mut out_idx = 0;
    let mut rec_num = 0;
    while let Some(record) = reader.next() {
        let seq_rec =
            record.unwrap_or_else(|_| panic!("Error reading record number {}", rec_num + 1));
        seq_rec
            .write(&mut outs[out_idx], Some(LineEnding::Unix))
            .unwrap_or_else(|_| panic!("Error writing record number {}", rec_num + 1));

        eprintln!("Wrote record number {}", rec_num + 1);
        out_idx += 1;
        rec_num += 1;
        if out_idx == outs.len() {
            out_idx = 0;
        }
    }
    // Close all the files.
    let mut n_out = 0;
    for mut out in outs {
        out.flush()
            .unwrap_or_else(|_| panic!("Error flushing output!"));
        n_out += 1;
        eprintln!("Finished output number {}", n_out);
    }
}

pub fn main() {
    let infile = "large.test.fa.gz";
    run_split_fastx(&["a.fa".to_string(), "b.fa.gz".to_string()], infile);
}

Hete is the fasta file used in this example code:
large.test.fa.gz

Any help would be greatly appreciated!

Thanks!
Mitchell

@sstadick
Copy link
Owner

@mrvollger Sorry the delay in getting to this! I'll take a look later tonight or tomorrow. I don't see anything immediately wrong with your example code.

@mrvollger
Copy link
Author

Thanks so much for taking a look! I am having a hard time confirming this but I think it might have to do with the length of the sequences. I have run basically this code on short reads and never had a problem, but then I tried some contigs from assemblies and started to get this error.

Thanks again!

@sstadick
Copy link
Owner

Yep, I have a fix incoming to gzp. It looks like when the write calls have more bytes than the BGZF max buffer size the final call to flush will try to send all the remaining bytes at once, which will then fail in BGZF since the number of bytes exceeds the max buffer size allowed by the BGZF spec.

Thank you for making this issue! I would not have run into this normally I don't think and missed it in all my test cases as well.

@sstadick sstadick linked a pull request Jan 19, 2022 that will close this issue
@sstadick
Copy link
Owner

Try the v0.9.3 release

@sstadick
Copy link
Owner

And thank you again for the issue!

@mrvollger
Copy link
Author

mrvollger commented Jan 19, 2022

Just tried it and it got through the file and wrote all the records! Thanks!

However, I do get this error/warning from samtools when I try to index the resulting file.

$ cat .test/large.test.fa.gz | cargo run --release --bin rb -- -vvv fasta-split 1.fa.gz
$ samtools faidx 1.fa.gz
[W::bgzf_read_block] EOF marker is absent. The input is probably truncated

The index does appear to be created after this, and it is correct in this case, but it seems a little worrying.

@sstadick
Copy link
Owner

Good catch! The fix to make sure large writes were handled ended up skipping the EOF marker. As you noted it's not technically needed but the HTS suit of tools will complain if it isn't present.

v0.9.5 has the fixes. Thanks for following up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants