apcamargo / taxopy

A Python package for obtaining complete lineages and the lowest common ancestor (LCA) from a set of taxonomic identifiers
https://apcamargo.github.io/taxopy
GNU General Public License v3.0
38 stars 5 forks source link

ExtractionError when running taxopy on several samples in parallel with specified dmp files and keep_files=True #1

Open dawnmy opened 3 years ago

dawnmy commented 3 years ago

I use taxopy to find the LCA of taxids from a sample.

cd = path.dirname(path.abspath(__file__))

nodes_dmp_file = path.join(cd, "../data/nodes.dmp")
names_dmp_file = path.join(cd, "../data/names.dmp")

taxdb = taxopy.TaxDb(nodes_dmp=nodes_dmp_file,
                     names_dmp=names_dmp_file,
                     keep_files=True)

This is how I create the DB. Running the script sample by sample was fine without any error. But when I try to run it on several samples in parallel, I got the error below:

Traceback (most recent call last):
  File "/home/user/miniconda3/lib/python3.7/tarfile.py", line 2163, in makefile
    copyfileobj(source, target, tarinfo.size, ReadError, bufsize)
  File "/home/user/miniconda3/lib/python3.7/tarfile.py", line 250, in copyfileobj
    dst.write(buf)
OSError: [Errno 116] Stale file handle

During handling of the above exception, another exception occurred:

OSError: [Errno 116] Stale file handle

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/miniconda3/lib/python3.7/site-packages/taxopy/core.py", line 108, in _download_taxonomy
    tf.extract("nodes.dmp", path=self._taxdb_dir)
  File "/home/user/miniconda3/lib/python3.7/tarfile.py", line 2044, in extract
    numeric_owner=numeric_owner)
  File "/home/user/miniconda3/lib/python3.7/tarfile.py", line 2114, in _extract_member
    self.makefile(tarinfo, targetpath)
  File "/home/user/miniconda3/lib/python3.7/tarfile.py", line 2163, in makefile
    copyfileobj(source, target, tarinfo.size, ReadError, bufsize)
OSError: [Errno 116] Stale file handle

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/vol/projects/user/projects/tax.py", line 22, in <module>
    taxdb = taxopy.TaxDb()
  File "/home/user/miniconda3/lib/python3.7/site-packages/taxopy/core.py", line 87, in __init__
    self._nodes_dmp, self._names_dmp = self._download_taxonomy()
  File "/home/user/miniconda3/lib/python3.7/site-packages/taxopy/core.py", line 112, in _download_taxonomy
    "Something went wrong while extracting the taxonomy files."
taxopy.exceptions.ExtractionError: Something went wrong while extracting the taxonomy files.

It seems trying to download and extract the dmp files even the dmp files are specified.

apcamargo commented 3 years ago

Hi @dawnmy. Thanks for the report!

Are the dmp files being deleted? How exactly are you running the parallel jobs? Are you using a multi-node cluster?

dawnmy commented 3 years ago

Thank you so much for your response! The dmp files were not deleted. Had a manual inspect, they are always there. I ran parallel jobs with Snakemake submitting jobs to SGE queue on a multi-node cluster. Here is the rule define:

rule tax:
    input:
        sample_taxids_file
    output:
        classify = results_dir + "/tax/{sample}.tax.norm.txt",
    params:
        tax_bin = '/vol/projects/user/projects/bin/tax.py',
        out_prefix = results_dir + "/tax/{sample}.tax"
    shell:
        """
        python {params.tax_bin} {input} -o {params.out_prefix}

        """

And run the jobs with

snakemake -s tax.smk --latency-wait 120 -c "qsub -cwd -q all.q -pe multislot {threads} -i /dev/null -e /vol/cluster-data/`whoami`/sge_logs/ -o /vol/cluster-data/`whoami`/sge_logs/ -v PATH" -j 20  

The jobs were probably submitted to different nods. I guess this may cause the issue.

apcamargo commented 3 years ago

Thanks!

Could you try to execute the jobs in parallel in a single cluster? To do that you can write a script like this:

# PBS directives
# (...)

snakemake -j 20 -s tax.smk

And then qsub it.

Even if this solves the problem, I'll investigate this issue further soon. I'm very occupied right now, but I'll look into it as soon as I can.

dawnmy commented 3 years ago

Thank you so much for your help. Just tested it with multiple jobs on the same node. And it was fine without any issue.

dawnmy commented 3 years ago

Found another issue might be related to this one. I was trying to use taxopy with python multiprocessing where multiple processes used the taxdb created by

taxdb = taxopy.TaxDb(nodes_dmp=nodes_dmp_file,
 names_dmp=names_dmp_file,
 keep_files=True)

at the same time. The taxdb was only created once. And all processes used the same taxdb. I found out that there was only one process created. And the whole task kept running endless without any output. It seems the processes encountered a dead lock.

Here is a minimal example:

taxdb = taxopy.TaxDb(nodes_dmp=nodes_dmp_file,
    names_dmp=names_dmp_file,
    keep_files=True)

def job(taxids):
    taxa = [taxopy.Taxon(taxid, taxdb)
                                    for taxid in taxids]
    lca_taxa = taxopy.find_majority_vote(
                                    taxa, taxdb)
    return lca_taxa.name

with Pool(10) as p:
    sample_lca_taxa_names = p.map(job, sample_taxids_list)
apcamargo commented 3 years ago

Humn. I'd expect that to work. Have you tried using joblib?

Again, thanks for keeping me informed! I'll take a closer look at those issues as soon as I can.

dawnmy commented 3 years ago

Haven't try joblib on it. You are right, I should give it a try. Thank you for your prompt response.

apcamargo commented 3 years ago

Thanks for providing the example!

I just tried the following code:

from multiprocessing import Pool
import taxopy

taxdb = taxopy.TaxDb(keep_files=True)

def job(taxids):
    taxa = [taxopy.Taxon(taxid, taxdb for taxid in taxids]
    lca_taxa = taxopy.find_majority_vote(taxa, taxdb)
    return lca_taxa.name

sample_taxids_list = [['4930', '9606'], ['4930', '9606', '9593'], ['4930', '9606', '9593', '9975']]

with Pool(3) as p:
    sample_lca_taxa_names = p.map(job, sample_taxids_list)

And it executed fine. Can you show me the rest of your code? How did you create your sample_taxids_list?

dawnmy commented 3 years ago

Thank you for testing the example. You are right. It is fine if I define the job and create the taxdb outside of class. The issue I encountered seems to be this one: https://stackoverflow.com/questions/31729008/python-multiprocessing-seems-near-impossible-to-do-within-classes-using-any-clas . It looks like a general problem in Python multiprocessing.

apcamargo commented 3 years ago

Good to know and thanks for the reference! I'm considering moving the database to sqlite to reduce memory usage. It would probably solve this problem too, as the db would be stored on disk.

dawnmy commented 3 years ago

That would be great. Do you also expect to have speedup with sqlite? I know ETE3 is using sqlite, and it seems to be very fast.

apcamargo commented 3 years ago

That's a good question. I'll have to do some testing, but I expect it to be fast with a much better memory management.

apcamargo commented 3 years ago

In the long run I also plan to port some functions to Rust to improve speed and maybe implement some parallelization from there.

It's not a priority right now, though. I've been using taxopy for some pretty big operations and speed was never an big issue (even though some functions are written in a pretty hacky way right now).

dawnmy commented 3 years ago

I tested ETE3 for taxonomy queries this afternoon. The multiprocessing is an issue as each process must create a new sqlite database connection. Otherwise, it gives sqlite3.DatabaseError: database disk image is malformed. And this looks like a known issue for Python multiprocessing, the database connection is not be serializable and can not be shared in memory by multiple threads or processes. If I create a new connection in each process, the CPU load drops to 10% and not efficient at all. This might be a problem if you switch to sqlite. Nevertheless, there might be a decent solution for this.

apcamargo commented 3 years ago

Memory sharing is always difficult. I'll have a look at sqlite to see if there's a way around that. In any case, joblib should help you for now.

Thank you for bringing this to my attention. It's a pretty important issue that I hadn't put much thought into.

maxibor commented 2 years ago

Hi @apcamargo , Thanks a lot for creating Taxopy, it turns out to be very useful ! After doing a bit of profiling of taxopy, I realised that one of the most time consuming operations for findiing the LCA is the set interesection computation when the taxon list is quite large. https://github.com/apcamargo/taxopy/blob/4f299fd7bf150b0921fc9d3967fa24acd62f9ba8/taxopy/utilities.py#L83 However, doing it this way is most likely already the most efficient way to do it in Python (kudos for that 😃), therefore, the only speedup, when needing to compute many LCAs would come from concurrency.

Multiprocessing would be beneficial here, however the overhead of serializing all the dicts of the TaxDB instance might negate the gain of paralellization.

As SQlite is comparatively quite slow, and not really needed for a "simple" key value storage, other solutions might be more relevant. Unfortunately shared memory dictionary do not exist in Python (even though there are workarounds), but what about using the TAXIDs as indices for lists of rank, name, and lineage, since shared memory lists are natively supported ?

Another more dictionary like solution would be the python shelve module (which uses some form of DBM in the background), by casting the TAXIDs to strings.

I'd be happy to contribute if you are interested in looking further into this.

apcamargo commented 2 years ago

Hey @maxibor! Thanks so much for the detailed report!

I tried to replace the dictionary with SQLite, RocksDB, and shelve, but the slowdown was just too big to justify the change. That said, my goal was to reduce memory consumption and I didn't really investigate paralellization.

I don't think I have the time to work on this right now, but I'd gladly look at contributions!