nanoporetech / medaka

Sequence correction provided by ONT Research
https://nanoporetech.com
Other
410 stars 74 forks source link

Why does my attempt at retrofitting medaka smolecule to accept BAM input lead to failure in medaka stitch collapse_neighbors #379

Closed gordonrix closed 2 years ago

gordonrix commented 2 years ago

I am working on implementing smolecule into my analysis pipeline that uses medaka to generate amplicon UMI consensus sequences. Until now I have been using a very hacky method of parallelization, but now I'd like to do something like what smolecule does. I would like to use a reference-based approach for this. I have already implemented the suggestion that was made in this thread and it worked great to eliminate the poa step. However, in order to identify UMIs, I already have to perform an alignment, so I would like to just feed the program the BAM file that I already have instead of needing to align to the reference sequence again.

To do this, I skip the alignment steps and retrieve the attributes from each BAM entry and use those to construct Alignment named tuples in the multi_from_fastx method. A new BAM with a new header is then written with these Alignments, and that is then used by medaka.prediction. This worked for a small set of sequences (or at least appears to have worked, as I get the right number of consensus sequences out and they seem accurate), but when applied to a much larger BAM file input, this error was produced during medaka.stitch:

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Exception in thread QueueManagerThread:
Traceback (most recent call last):
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
    work_item.future.set_exception(bpe)
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 547, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7f8a77585af0 state=cancelled>
Traceback (most recent call last):
  File "/home/popos/miniconda3/envs/maple/bin/medaka", line 11, in <module>
    sys.exit(main())
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/site-packages/medaka/medaka.py", line 739, in main
    args.func(args)
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/site-packages/medaka/maple_smolecule.py", line 534, in main
    medaka.stitch.stitch(args)
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/site-packages/medaka/stitch.py", line 231, in stitch
    for (rname, start, stop), seq_parts, qualities in contigs:
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/site-packages/medaka/stitch.py", line 151, in collapse_neighbours
    contig = next(contigs)
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
    yield fs.pop().result()
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

The .hdf file has something to do with this, as the resulting .hdf file that is produced is also corrupted in some way. If it is placed into the output folder prior to running the customized smolecule (but not the unmodified smolecule), the same error will appear when using the smaller BAM file input (which had previously succeeded.)

One other piece of information that I can't explain, and which might help you with providing guidance, is that when the original smolecule is run on the small dataset, several messages that look like this: [20:43:50 - TrimOlap] UMI-1:0.0-484.2 is contained within UMI-1:0.0-964.0, skipping. are output, but no such messages appear for my modified smolecule, despite the fact that I am using the same sequences for each (albeit slightly different alignment methods)

Happy to also post my (very unsightly at this point) modded smolecule.py script in case that might help you understand the problem better.

Sorry, I'm a bit out of my element here. Let me know what extra info you'd like to see to help diagnose the problem, and thank you very much for any assistance you can provide!

Environment:

Installation method : Conda OS: PopOS (Ubuntu) 20.04 running on VirtualBox medaka version : Medaka 1.6.1 Python: 3.8

cjw85 commented 2 years ago

This section is the only part of the error I can see relating to medaka code (the rest is to do with multiprocessing):

medaka/stitch.py", line 231, in stitch for (rname, start, stop), seq_parts, qualities in contigs:

@ftostevin-ont might know something as I believe this code changed recently.

ftostevin-ont commented 2 years ago

It's hard to tell from the error message what's going wrong. For some reason there is an error as collapse_neighbours is trying to loop over the chunks that are coming from stitch_from_probs, but I can't say why.

...
File "/home/popos/miniconda3/envs/maple/lib/python3.8/site-packages/medaka/stitch.py", line 151, in collapse_neighbours
    contig = next(contigs)
File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
    yield fs.pop().result()
  File "/home/popos/miniconda3/envs/maple/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
...

There were two changes in this code recently. One made stitch_from_probs and collapse_neighbours output at 3-tuple of coordinates, sequence, and quality array, whereas before it was a 2-tuple without qualities, but this should not affect the consensus.hdf which was not changed. The other change added read depth information to the consensus.hdf output, so consensus.hdf files from before 46238a7b and after may not be compatible. As long as you are using consistent versions of predict and stitch this should not cause a problem.

cjw85 commented 2 years ago

@gordonrix You will have to share you code for us to understand better the error you are seeing. Also your input files. I would expect the big/small file effect simply being a case of coming across corner cases in the data being processed. If you can pin the error down to the reads/UMIs causing the errors, that would be a useful start.

gordonrix commented 2 years ago

Thanks so much for the quick help.

I am only using consensus.hdf files that have been made using medaka 1.6.1 so I think that isn't the problem.

I will work on isolating the problematic sequences. Getting a bit lost in the stitch.py script, is there a place in there you could suggest for me to retrieve the read (or subread) names upon reaching an error in the collapse_neighbors function? If not I can split up my file into small chunks and see if any of the chunks still throw an error, but I imagine there's a possibility that it's a particular combination of reads that are the problem, and that splitting into small chunks might eliminate that combination.

Couple of asides related to github posting: how did you edit my error message to not look so horrible? (sorry bout that) Was it just manually adding newline characters where appropriate? And is there any way for me to add a question tag to this?

Ok I am pasting my modified smolecule.py below with some comments added to call attention to changes. If you much prefer a file on github to look at I can do that as well, just let me know.

Each Read corresponds to a single UMI. At the top I am manually adding the reference sequence that I use as the consensus for all Reads.

Line 37, set REFSEQ as consensus for Read

Line 53, comment out self._alignments = self.orient_subreads()

Line 117, insert multi_from_bam method that is just a copy of multi_from_fasta that returns Alignment tuples instead of Subread tuples

Line 199, deleted a poa_consensus method which I'm no longer using

Line 324, gut _read_worker. All it does now is initialize the Read

Line 370, add bam_workflow function, mainly just to build the header and pass alignments to main

Lines 456 and 462, use multi_from_bam and bam_workflow functions

"""Creation of consensus sequences from a BAM file sorted by the unique ID tag UG'"""
from collections import namedtuple
from concurrent.futures import as_completed, ProcessPoolExecutor
import os
from timeit import default_timer as now
import warnings

import mappy
import numpy as np
import pysam

from medaka import parasail, spoa
import medaka.align
import medaka.common
import medaka.medaka

Subread = namedtuple('Subread', 'name seq')
Alignment = namedtuple('Alignment', 'rname qname flag rstart seq cigar')
REFSEQ = 'ggccgcctagagcctgcactagacgaactaggcaagatgcgtccaatccgtctaaacatggtgacaacgctggacagatgacgtaacaccgagccacatcctgagAGGAGATGnnnnnnCGTGTAGAGACTGCGTAGGNNNYRNNNYRNNNYRNNNgatgacctatacataggaagatctatagaaacaaaaagattaataactttcaaatatcagaaaaatatagaaaCatGtgataagctcatagacatataaaaaaatATGGTTTCTAAAGGTGAAGCTGTTATTAAAGAATTTATGAGATTTAAAGTTCACATGGAAGGTTCTATGAATGGTCATGAATTTGAAATTGAAGGTGAAGGTGAAGGTAGACCTTATGAAGGTACTCAAACTGCTAAATTGAAAGTTACTAAAGGTGGTCCATTGCCATTTTCTTGGGATATTTTGTCTCCACAATTTATGTATGGTTCTAGAGCTTTTATTAAACATCCTGCTGATATTCCTGATTATTATAAACAATCTTTTCCTGAAGGTTTTAAATGGGAAAGAGTTATGAATTTTGAAGATGGTGGTGCTGTTACTGTTACTCAAGATACTTCTTTGGAAGATGGTACTTTGATTTATAAAGTTAAATTGAGAGGTACTAATTTTCCACCTGATGGTCCTGTTATGCAAAAAAAAACTATGGGTTGGGAAGCTTCTACTGAAAGATTGTATCCTGAAGATGGTGTTTTGAAAGGTGATATTAAAATGGCTTTGAGATTGAAAGATGGTGGTAGATATTTGGCTGATTTTAAAACTACTTATAAAGCTAAAAAACCTGTTCAAATGCCGGGTGCTTATAATGTAGATAGAAAACTTGACATTACTTCACATAACGAGGATTATACTGTTGTTGAACAATATGAAAGATCTGAAGGTAGACATTCTACTGGTGGTATGGATGAATTGTATAAATCTGGTTTGAGATCTAGGGCTCAAGCTTCTAATTCTGCTGTTGATGGTACTGCTGGTCCTGGTTCTACTGGTTCTAGAGGTTCTTAAatcctattaatGGCCGCaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaGGCCGGCATGGTCCCAGCCTCCTCGCTGGCGCCGGCTGGGCAACATTCCGAGGGGACCGTCCCCTCGGTAATGGCGAATGGGACCCACGctaggctgTAAtttttcgccggagtcaattaggtcatacNNNYRNNNYRNNNYRNNNCACTCGCACTGACTCGATnnnnnnGCTTTCgagctaccgttccagcgtcaatagtgccgatgaccacagacccggttaagacatagccgaatggagccgcgccgaccacagaatgataacgagtcacagc'.upper()

class Read(object):
    """Functionality to extract information from a read with subreads."""

    def __init__(self, name, subreads, initialize=False):
        """Initialize repeat read analysis.

        :param name: read name.
        :param subreads: list of subreads.
        :param initialize: initialize subread alignments.

        """
        self.name = name
        self.subreads = subreads
        if len(self.subreads) == 0:
            raise ValueError(
                "Cannot create a read with fewer than 0 subreads.")
        self.consensus = REFSEQ
        # self.consensus = snakemake.config['runs'][snakemake.wildcards.tag][
        # SW alignments of subreads to consensus
        self._alignments = None
        self._alignments_valid = False
        # orientation of subreads wrt consensus
        self._orient = None
        # has initialize() been run (i.e. are the above filled in)
        self._initialized = False
        # has a consensus been run
        self.consensus_run = False

    def initialize(self):
        """Calculate initial alignments of subreads to scaffold read."""
        # we find initial alignments with parasail as mappy may not find some
        if not self._initialized:
            # self._alignments = self.orient_subreads()
            self._alignments_valid = True
            self._initialized = True
        return None

    @classmethod
    def from_fastx(cls, fastx, name=None):
        """Create a Read from a fasta/q file.

        :param fastx: input file path.
        :param name: name of Read. If not given the filename will be used.

        """
        try:
            read = next(cls.multi_from_fastx(fastx, take_all=True))
        except Exception:
            raise IOError("Could not create Read from file {}.".format(fastx))
        return read

    @classmethod
    def multi_from_fastx(
            cls, fastx,
            take_all=False, read_id=None, depth_filter=1, length_filter=0):
        """Create multiple `Read` s from a fasta/q file.

        It is assumed that subreads are grouped by read and named with
        <read_id>_<subread_id>.

        :param fastx: input file path.
        :param take_all: skip check on subread_ids, take all subreads in one
            `Read`.
        :param read_id: name of `Read`. Only used for `take_all == True`. If
            not given the basename of the input file is used.
        :param depth_filter: require reads to have at least this many subreads.
        :param length_filter: require reads to have a median subread length
            above this value.

        """
        depth_filter = max(1, depth_filter)
        if take_all and read_id is None:
            read_id = os.path.splitext(os.path.basename(fastx))[0]
        else:
            read_id = None
        subreads = []
        with pysam.FastxFile(fastx) as fh:
            for entry in fh:
                if not take_all:
                    cur_read_id = entry.name.split("_")[0]
                    if cur_read_id != read_id:
                        if len(subreads) >= depth_filter:
                            med_length = np.median(
                                [len(x.seq) for x in subreads])
                            if med_length > length_filter:
                                yield cls(read_id, subreads)
                        read_id = cur_read_id
                        subreads = []
                if len(entry.sequence) > 0:
                    subreads.append(Subread(entry.name, entry.sequence))

            if len(subreads) >= depth_filter:
                med_length = np.median([len(x.seq) for x in subreads])
                if med_length > length_filter:
                    yield cls(read_id, subreads)

    @classmethod
    def multi_from_bam(
            cls, bam,
            take_all=False, read_id=None, depth_filter=1, length_filter=0):
        """Create multiple `Read` s from a fasta/q file.

        It is assumed that subreads are grouped by read and named with
        <read_id>_<subread_id>.

        :param fastx: input file path.
        :param take_all: skip check on subread_ids, take all subreads in one
            `Read`.
        :param read_id: name of `Read`. Only used for `take_all == True`. If
            not given the basename of the input file is used.
        :param depth_filter: require reads to have at least this many subreads.
        :param length_filter: require reads to have a median subread length
            above this value.

        """
        depth_filter = max(1, depth_filter)
        if take_all and read_id is None:
            read_id = os.path.splitext(os.path.basename(bam))[0]
        else:
            read_id = None
        alignments = []
        with pysam.AlignmentFile(bam, 'rb') as bh:
            for BAMentry in bh.fetch(until_eof=True):
                if not take_all:
                    cur_read_id = 'UMI-'+str(BAMentry.get_tag('UG'))
                    if cur_read_id != read_id:
                        if len(alignments) >= depth_filter:
                            med_length = np.median(
                                [len(x.seq) for x in alignments])
                            if med_length > length_filter:
                                yield cls(read_id, alignments)
                        read_id = cur_read_id
                        alignments = []
                if len(BAMentry.query_sequence) > 0:
                    alignments.append(Alignment(
                                        f"{read_id}", BAMentry.query_name, 0, BAMentry.reference_start, BAMentry.query_sequence, BAMentry.cigarstring))

            if len(alignments) >= depth_filter:
                med_length = np.median([len(x.seq) for x in alignments])
                if med_length > length_filter:
                    yield cls(read_id, alignments)

    @property
    def seqs(self):
        """Return a list of the subread sequences."""
        return [x.seq for x in self.subreads]

    @property
    def interleaved_subreads(self):
        """Return a list of subreads with + and - reads interleaved.

        :returns: orientations, subreads.

        The ordering may not be strictly +-+-+-..., the subreads
        are positioned such that + and - reads are both distributed
        uniformally through the list (according to their overall
        frequency).
        """
        self.initialize()
        fwd, rev = list(), list()
        for orient, subread in zip(self._orient, self.subreads):
            if orient:
                fwd.append([subread, True, 0])
            else:
                rev.append([subread, False, 0])
        for reads in fwd, rev:
            if len(reads) > 0:
                rate = 1.0 / len(reads)
                for i in range(len(reads)):
                    reads[i][2] = rate * i
        reads = sorted(fwd + rev, key=lambda x: x[2])
        reads, orients, garbage = zip(*reads)
        return orients, reads

    @property
    def nseqs(self):
        """Return the number of subreads contained in the read."""
        return len(self.subreads)

    def orient_subreads(self):
        """Find orientation of subreads with respect to consensus sequence.

        :returns: `medaka.align.Alignment` s of subreads to consensus.

        """
        # TODO: use a profile here
        # TODO: refactor with align_to_template
        self._orient = []
        alignments = []
        for sr in self.subreads:
            rc_seq = medaka.common.reverse_complement(sr.seq)
            result_fwd = parasail.sw_trace_striped_16(
                sr.seq, self.consensus, 8, 4, parasail.dnafull)
            result_rev = parasail.sw_trace_striped_16(
                rc_seq, self.consensus, 8, 4, parasail.dnafull)
            is_fwd = result_fwd.score > result_rev.score
            self._orient.append(is_fwd)
            result = result_fwd if is_fwd else result_rev
            seq = sr.seq if is_fwd else rc_seq
            if result.cigar.beg_ref >= result.end_ref or \
                    result.cigar.beg_query >= result.end_query:
                # unsure why this can happen
                continue
            rstart, cigar = medaka.align.parasail_to_sam(result, seq)
            flag = 0 if is_fwd else 16
            aln = Alignment(
                'consensus_{}'.format(self.name), sr.name,
                flag, rstart, seq, cigar)
            alignments.append(aln)
        return alignments

    def align_to_template(self, template, template_name):
        """Align subreads to a template sequence using Smith-Waterman.

        :param template: sequence to which to align subreads.
        :param template_name: name of template sequence.

        :returns: `Alignment` tuples.

        """
        self.initialize()
        alignments = []
        for orient, sr in zip(self._orient, self.subreads):
            if orient:
                seq = sr.seq
            else:
                seq = medaka.common.reverse_complement(sr.seq)
            result = parasail.sw_trace_striped_16(
                seq, template, 8, 4, parasail.dnafull)
            if result.cigar.beg_ref >= result.end_ref or \
                    result.cigar.beg_query >= result.end_query:
                # unsure why this can happen
                continue
            rstart, cigar = medaka.align.parasail_to_sam(result, seq)
            flag = 0 if orient else 16
            aln = Alignment(template_name, sr.name, flag, rstart, seq, cigar)
            alignments.append(aln)
        return alignments

    def mappy_to_template(self, template, template_name, align=True):
        """Align subreads to a template sequence using minimap.

        :param template: sequence to which to align subreads.
        :param template_name: name of template sequence.
        :param align: retrieve cigar string (else produce paf)

        :returns: `Alignment` tuples.

        """
        if not align:
            # align False requires forked minimap2, and isn't much faster for
            # a small number of sequences due to index construction time.
            warnings.warn("`align` is ignored", DeprecationWarning)
        alignments = []
        aligner = mappy.Aligner(seq=template, preset='map-ont')
        for sr in self.subreads:
            try:
                hit = next(aligner.map(sr.seq))
            except StopIteration:
                continue
            else:
                flag = 0 if hit.strand == 1 else 16
                if hit.strand == 1:
                    seq = sr.seq
                else:
                    seq = medaka.common.reverse_complement(sr.seq)
                clip = [
                    '' if x == 0 else '{}S'.format(x)
                    for x in (hit.q_st, len(sr.seq) - hit.q_en)]
                if hit.strand == -1:
                    clip = clip[::-1]
                cigstr = ''.join((clip[0], hit.cigar_str, clip[1]))
                aln = Alignment(
                    template_name, sr.name, flag, hit.r_st, seq, cigstr)
                alignments.append(aln)
                hit = None
        return alignments

def write_bam(fname, alignments, header, bam=True):
    """Write a `.bam` file for a set of alignments.

    :param fname: output filename.
    :param alignments: a list of `Alignment` tuples.
    :param header: bam header
    :param bam: write bam, else sam

    """
    mode = 'wb' if bam else 'w'
    with pysam.AlignmentFile(fname, mode, header=header) as fh:
        for ref_id, subreads in enumerate(alignments):
            for aln in sorted(subreads, key=lambda x: x.rstart):
                a = medaka.align.initialise_alignment(
                    aln.qname, ref_id, aln.rstart, aln.seq,
                    aln.cigar, aln.flag)
                fh.write(a)
    if mode == 'wb':
        pysam.index(fname)

def _read_worker(read, align=True, method='spoa'):
    read.initialize()
    aligns = None

    return read.name, read.consensus, read.subreads

def poa_workflow(reads, threads, method='spoa'):
    """Worker function for processing repetitive reads.

    :param reads: list of `Read` s.
    :param threads: number of threads to use for processing.

    """
    # TODO: this is quite memory inefficient, but we can only build the header
    #       by seeing everything.
    logger = medaka.common.get_named_logger('POAManager')
    pool = ProcessPoolExecutor(max_workers=threads)
    futures = []
    for read in reads:
        logger.debug("Adding {} to queue.".format(read.name))
        futures.append(pool.submit(_read_worker, read, method=method))

    header = {
        'HD': {'VN': 1.0},
        'SQ': [],
    }
    consensuses = []
    alignments = []

    for fut in as_completed(futures):
        try:
            res = fut.result()
        except Exception as e:
            logger.warning(e)
            pass
        else:
            rname, consensus, aligns = res
            logger.debug('Finished {}.'.format(rname))
            if consensus is not None:
                header['SQ'].append({
                    'LN': len(consensus),
                    'SN': rname,
                })
                consensuses.append([rname, consensus])
                alignments.append(aligns)

    return header, consensuses, alignments

def bam_workflow(reads, threads, method='spoa'):
    """Worker function for processing repetitive reads.

    :param reads: list of `Read` s.
    :param threads: number of threads to use for processing.

    """
    # TODO: this is quite memory inefficient, but we can only build the header
    #       by seeing everything.
    logger = medaka.common.get_named_logger('POAManager')
    pool = ProcessPoolExecutor(max_workers=threads)
    futures = []
    for read in reads:
        logger.debug("Adding {} to queue.".format(read.name))
        futures.append(pool.submit(_read_worker, read, method=method))

    header = {
        'HD': {'VN': 1.0},
        'SQ': [],
    }
    consensuses = []
    alignments = []

    for fut in as_completed(futures):
        try:
            res = fut.result()
        except Exception as e:
            logger.warning(e)
            pass
        else:
            rname, consensus, aligns = res
            logger.debug('Finished {}.'.format(rname))
            if consensus is not None:
                header['SQ'].append({
                    'LN': len(REFSEQ),
                    'SN': rname,
                })
                consensuses.append([rname, REFSEQ])
                alignments.append(aligns)
    return header, consensuses, alignments

def main(args):
    """Entry point for repeat read consensus creation."""
    parser = medaka.medaka.medaka_parser()
    defaults = parser.parse_args([
        "consensus", medaka.medaka.CheckBam.fake_sentinel,
        "fake_out"])

    class MyArgs:
        """Wrap the given args with defaults for prediction function."""

        myargs = args
        mydefaults = defaults

        def __getattr__(self, attr):
            try:
                return getattr(self.myargs, attr)
            except AttributeError:
                return getattr(self.mydefaults, attr)

    args = MyArgs()

    logger = medaka.common.get_named_logger('Smolecule')
    if args.chunk_ovlp >= args.chunk_len:
        raise ValueError(
            'chunk_ovlp {} must be smaller than chunk_len {}'.format(
                args.chunk_ovlp, args.chunk_len))
    medaka.common.mkdir_p(args.output, info='Results will be overwritten.')

    def _multi_file_reader():
        for fname in args.fasta:
            try:
                yield Read.from_fastx(fname)
            except Exception:
                pass

    if len(args.fasta) > 1:
        logger.info(
            "Given {} input files, assuming one read per file.".format(
                len(args.fasta)))
        reads = _multi_file_reader()
    else:
        logger.info(
            "Given one input file, subreads are assumed "
            "to be grouped by read.")
        reads = Read.multi_from_bam(
            args.fasta[0], depth_filter=args.depth, length_filter=args.length)

    logger.info(
        "Running {} pre-medaka consensus for all reads.".format(args.method))
    t0 = now()
    header, consensuses, alignments = bam_workflow(
        reads, args.threads, method=args.method)
    t1 = now()

    logger.info(
        "Writing medaka input bam for {} reads.".format(len(alignments)))
    bam_file = os.path.join(args.output, 'subreads_to_spoa.bam')
    write_bam(bam_file, alignments, header)

    spoa_file = os.path.join(args.output, 'poa.fasta')
    with open(spoa_file, 'w') as fh:
        for rname, cons in consensuses:
            fh.write('>{}\n{}\n'.format(rname, cons))

    logger.info("Running medaka consensus.")
    t2 = now()
    args.bam = bam_file
    out_dir = args.output
    args.output = os.path.join(out_dir, 'consensus.hdf')
    medaka.prediction.predict(args)
    t3 = now()

    logger.info("Running medaka stitch.")
    args.draft = spoa_file
    args.inputs = [args.output]
    out_ext = 'fasta'
    if args.qualities:
        out_ext = 'fastq'
    args.output = os.path.join(out_dir, 'consensus.{}'.format(out_ext))
    args.regions = None  # medaka consensus changes args.regions
    args.fillgaps = False
    medaka.stitch.stitch(args)
    logger.info(
        "Single-molecule consensus sequences written to {}.".format(
            args.output))
    logger.info(
        "POA time: {:.0f}s, medaka time: {:.0f}s".format(
            t1 - t0, t3 - t2))
gordonrix commented 2 years ago

Ok so before I didn't run the mostly unmodified smolecule that uses .fasta files (but does not use poa) on the larger dataset because of how much time it would take to run, but I decided to try that and it turns out that the same error occurs. I'm still working on isolating a smaller subset of sequences that I can give you, but I think this means that the error is not related to my version of smolecule that accepts BAM files, but is more likely related to removal of the poa step. This smolecule.py is identical to that found in medaka 1.6.1 except for commenting out a few lines in the _read_worker function:

def _read_worker(read, align=True, method='spoa'):
    read.initialize()
    # if read.nseqs > 2:  # skip if there is only one subread
    #     for it in range(2):
    #         read.poa_consensus(method=method)
    aligns = None
    if align:
        aligns = read.mappy_to_template(
            template=read.consensus, template_name=read.name)
    return read.name, read.consensus, aligns
gordonrix commented 2 years ago

@ftostevin-ont @cjw85

In my efforts to isolate specific problematic reads, I am finding that isolating single reads doesn't seem to reproduce the issue. By inserting a try except clause in the part of collapse_neighbors within stitch.py to grab the name of reads that trigger an error like so:

def collapse_neighbours(contigs):
    """Build larger contigs by joining neighbours.

    :param contigs: a stream of ordered (partial)-contigs.
    """
    try:
        contig = next(contigs)
    except StopIteration:
        return
    ref_name, start, stop = contig[0]
    buffer = contig[1]
    qual = contig[2]
    try:
        for contig in contigs:
            c_rn, c_start, c_stop, = contig[0]
            if c_rn == ref_name and c_start == stop + 1:
                stop = c_stop
                buffer.extend(contig[1])
                qual.extend(contig[2])
            else:
                # clear buffer, start anew
                yield (
                    (ref_name, start, stop), buffer, qual)
                ref_name, start, stop = contig[0]
                buffer = contig[1]
                qual = contig[2]
    except:
        print(ref_name)
        yield ((ref_name, start, stop), buffer, qual)
    yield ((ref_name, start, stop), buffer, qual)

I was able to print and identify one UMI that might be problematic, but when I tried to rerun the script just on that set of subreads, the script executed without a problem.

Instead of attempting to give you a 300 MB file that causes this error I'm seeing, I tried splitting it up. When I split it up into 10 separate files, the smolecule.py with the only modification being the _read_worker method (see above comment) failed to process the first batch of fasta sequences. However, when I further subdivided this file into 10 separate files, all 10 were processed without issue. This may present a workaround for me where I can split up my file into small enough files to not trip this error, which is comically similar to what I was doing before trying to implement smolecule.py, but it should be much faster as there will be much more batching.

These two observations to me suggest that whatever issue is going on results from Stitch being called on many Reads, and that not any one Read will reproduce the issue.

I have uploaded a .fasta file here that trips this error when run with smolecule.py in which the _read_worker function is modified to remove the poa step. All of the Reads in this fasta file have 4 or more subreads, so I don't think that the removal of the if statement to catch <2 subreads is the problem.

For now I'll implement a workaround, but it would be great if we could figure out what is going on here so I don't have to split and recombine files and so I don't have to worry that I'll trip this error in the future. Let me know if I can help out with that in any way.

gordonrix commented 2 years ago

Based on a similar error message that appeared in this issue on the PEPPER gihub, I think this is a memory consumption issue. @cjw85 @ftostevin-ont any suggestions for smolecule settings to try messing with to see if this is the case?

cjw85 commented 2 years ago

I don't think it's reasonable to assume that memory errors in another piece of software imply a memory error in medaka. To test that hypothesis it would be straightforward enough to simply watch the process with top to get an idea of what is happening.

I'm afraid @ftostevin-ont and I have limited time to investigate this issue. Publishing your branch of medaka and the commands you are running will greatly help.

gordonrix commented 2 years ago

Understood! Sorry, I thought I had made provided enough info for you to reproduce the issue, but if I did it was diffusely spread over a wall of text. Below I will provide what is hopefully a concise, easy path to reproduce this issue, namely, to install medaka via conda/mamba (side note: I think conda-forge is a required channel for this, but this channel is not listed here), comment out 3 lines in the _read_worker function of smolecule.py to skip the poa step, as suggested in this thread, download my pipeline repository which contains a fasta file in which subreads are grouped together and the first subread of each group is the reference sequence (manually downloading this file alone will also work), then run smolecule on this file. I have just verified that this process produces what I believe to be the same error, although this time it just yielded this part of the error:

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument

and then hanged for a long time. Keyboard interrupt showed that the collapse_neighbours function is somehow involved again though.

Ok, should be able to reproduce this with these steps, need only modify the path to the medaka scripts within the conda environment:

mamba create -n medakaTest -c bioconda -c conda-forge medaka==1.6.1 -y
sed -i '295,297 s/^/#/' /PATH/TO/CONDA/envs/medakaTest/lib/python3.8/site-packages/medaka/smolecule.py
git clone https://github.com/gordonrix/maple.git
cd maple/example_working_directory/sequences/UMI
conda activate medakaTest
medaka smolecule outDir smoleculeTest.fasta
cjw85 commented 2 years ago

It would help greatly if you could create a fork of medaka and push a branch with the changes you have made.

gordonrix commented 2 years ago

https://github.com/gordonrix/medaka

only two changes are adding the smoleculeTest.fasta file in the main folder, and commenting out 3 lines in smolecule.py

edit: sorry, hold on, I didn't make a branch because I am bad with git, I'll make a branch

edit: OK all good now, branch is "smoleculeEdit"

gordonrix commented 2 years ago

I think this is the same bug as https://github.com/nanoporetech/medaka/issues/381, which does not include any modifications to smolecule, so probably not worth your time to look at my fork. Just please ping me on here once that issue is solved.

P.S. I just discovered that using the proper medaka model (variant models w/ reference sequence instead of poa consensus) gives me some truly incredibly low error rates, so thank you very much for all your hard work on this tool.

cjw85 commented 2 years ago

@gordonrix, @rob234king

I've spent some time looking at this today. @ftostevin-ont previously looked at the issue and could only reproduce the error whilst using a GPU for the medaka RNN calculations. Setting CUDA_VISIBLE_DEVICES="" enables the program to run to completion.

I believe I now have a fix: we can isolate the RNN calculation in a subprocess and then disable access to GPUs during the stitch phase. I don't really understand why medaka is trying to create GPU contexts during the stitch phase, but this seems to be the heart of the issue: having used the GPU for the RNN calculations, stitching can trip up. Running the RNN calculations in a subprocess essentially means all the resources are freed by the time stitching is run.

@rob234king you mentioned that in your case you were not using a GPU and you error is different from the one I see reported. Nevertheless I'm wondering if the underlying cause is similar.

gordonrix commented 2 years ago

Thanks for looking into this! To be clear though, are you saying that GPU contexts are being created even when GPU resources are not available? I ask because I have seen this both on my virtual machine that doesn't have CUDA, and on my real linux machine which does. The error reported here was on the virtual machine but I am pretty sure it was the same issue on the machine with CUDA.

Anyway, thanks for reporting on this lead, lmk if I can help in any way

cjw85 commented 2 years ago

It may well be that we have multiple issues with the same sympton. What @ftostevin-ont and I have both shown is that you can take the intermediate outputs from medaka smolecule and run them through medaka consensus and medaka stitch in isolation in a reliable fashion. And we can also reliably process data when we run:

CUDA_VISIBLE_DEVICES="" medaka smolecule ...

gordonrix commented 2 years ago

Oh wonderful now I understand, ok I'll try that fix this week and let you know what I find

cjw85 commented 2 years ago

Medaka v1.7.0 contains the fixes I implemented yesterday, and is now available on PyPI.

gordonrix commented 2 years ago

Closing this as I am no longer seeing this error, and it does seem to be more performant. However, I am still seeing smolecule fail on my virtual machine if I don't split my file up. Based on when this occurs (after running for ~40 mins or so), I think this is related to stitch, but unlike before I am not seeing any error message, Snakemake may be obscuring it. Additionally, when I ran too many instances of smolecule simultaneously (8, one for each thread), it crashed my virtual machine. Right now it's looking stable if I assign 1000 MB per instance and split up my file into chunks of 5-10 thousand subreads. Very manageable, so thank you! I will open a new issue at some point if I can get some more information on the root of the problem.

rob234king commented 1 year ago

Thanks for doing that and taking the time. I'm on leave but will try it out next week.

Get Outlook for Androidhttps://aka.ms/AAb9ysg


From: cjw85 @.> Sent: Thursday, August 11, 2022 10:49:47 AM To: nanoporetech/medaka @.> Cc: Rob King @.>; Mention @.> Subject: Re: [nanoporetech/medaka] Why does my attempt at retrofitting medaka smolecule to accept BAM input lead to failure in medaka stitch collapse_neighbors (Issue #379)

Medaka v1.7.0 contains the fixes I implemented yesterday, and is now available on PyPI.

— Reply to this email directly, view it on GitHubhttps://github.com/nanoporetech/medaka/issues/379#issuecomment-1211767441, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AZSJ4DRGNRA3CKB4PJFFQG3VYTEDXANCNFSM53JS5PMA. You are receiving this because you were mentioned.Message ID: @.***>

IMPORTANT NOTICE: The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, re-transmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer. Although we routinely screen for viruses, addressees should check this e-mail and any attachment for viruses. We make no warranty as to absence of viruses in this e-mail or any attachments.

CONFIDENTIAL