sourmash-bio / sourmash

Quickly search, compare, and analyze genomic and metagenomic data sets.
http://sourmash.readthedocs.io/en/latest/
Other
471 stars 80 forks source link

Multithread or multiprocess #638

Open dgg32 opened 5 years ago

dgg32 commented 5 years ago

Hi dear developers. I am thoroughly enjoying sourmash currently within my little project. However, I see that sourmash is running maximal 100% of CPU. I wonder, it is possible to add multithread and even multiprocess in the code to make use the my multicore computer please? Thank you very much!

ctb commented 5 years ago

hi @dgg32, we are working on this by moving the underlying Cython code into rust for 3.0. In the meantime, on the few occasions when I'm calculating lots of signatures and it's slow, I use GNU parallel to run multiple processes; that's aided by the --randomize flag on sourmash compute, which lets me run (e.g.) 8 processes of 'sourmash compute' on the same 40,000 signatures. Let me know if that's something you're interested in more docs for ;)

dgg32 commented 5 years ago

Hi @ctb , yes, please!

olgabot commented 5 years ago

Here's a working parallelized sourmash compute implementation here as well, complete with sub-selecting on number of hashes:

import glob
import itertools
import os
import sys
import tempfile
import time

import click
from joblib import Parallel, delayed, load, dump
import numpy as np
import pandas as pd
from scipy.spatial.distance import squareform
from sourmash import signature as sig
from tqdm import tqdm

KSIZES = 21, 27, 33, 51
MOLECULES = 'dna', 'protein'
LOG2_NUM_HASHES = 8, 9, 10, 11, 12, 13, 14, 15, 16

def seconds_to_formatted_time(seconds):
    m, s = divmod(seconds, 60)
    h, m = divmod(m, 60)
    return '{:d}:{:02d}:{:02d}'.format(h, m, s)

def filter_siglist(siglist, ksize, moltype):
    if moltype == 'protein':
        molfilter = lambda x: x.minhash.is_protein
    else:
        molfilter = lambda x: not x.minhash.is_protein

    return [s for s in siglist if molfilter(s) and (s.minhash.ksize == ksize)]

def load_signatures(filenames):

    siglist = []

    for filename in tqdm(filenames):
        loaded = sig.load_signatures(filename)
        siglist.extend(loaded)
    return siglist

def _compare_serial(siglist, iterator):
    n = len(siglist)
    values = np.ones((n, n))

    for i, j in iterator:
        jaccard = siglist[i].jaccard(siglist[j])

        values[i, j] = jaccard
        values[j, i] = jaccard

    return values

def compare_all_pairs(siglist, n_jobs=None):
    n = len(siglist)

    # Combinations makes all unique sets of pairs, e.g. (A, B) but not (B, A)
    iterator = itertools.combinations(range(n), 2)
    sig_iterator = itertools.combinations(siglist, 2)

    if n_jobs is None or n_jobs == 1:
        values = _compare_serial(siglist, iterator)
    else:
        # This creates a condensed distance matrix
        condensed = Parallel(n_jobs=n_jobs, require='sharedmem',
                             backend='threading')(
            delayed(sig1.jaccard)(sig2) for sig1, sig2 in sig_iterator)
        values = squareform(condensed)

    return values

def _memmap_siglist(siglist):
    """Write a memory-mapped array of signatures"""
    temp_folder = tempfile.mkdtemp()
    filename = os.path.join(temp_folder, 'joblib_test.mmap')
    if os.path.exists(filename): os.unlink(filename)
    _ = dump(siglist, filename)
    large_memmap = load(filename, mmap_mode='r+')
    return large_memmap

def downsample_and_compare(signatures, log2_num_hash, molecule, ksize, n_jobs=None):
    t0 = time.time()

    filtered = filter_siglist(signatures, ksize, molecule)
    t1 = time.time()

    num_hash = 2**log2_num_hash
    downsampled = [s.minhash.downsample_n(num_hash) for s in filtered]
    t2 = time.time()

    memmapped = _memmap_siglist(downsampled)
    t3 = time.time()

    values = compare_all_pairs(memmapped, n_jobs=n_jobs)
    t4 = time.time()

    names = [s.name().split('|')[0] for s in filtered]
    t5 = time.time()

    df = pd.DataFrame(values, index=names, columns=names)
    t6 = time.time()

    print(f"--- num_hash: {num_hash}, molecule: {molecule}, "
          f"ksize: {ksize} ---")
    print(f"Time to filter on ksize and molecule: {seconds_to_formatted_time(t1-t0)}")
    print(f"Time to downsample on num_hash: {seconds_to_formatted_time(t2-t1)}")
    print(f"Time to write memory-mapped array: {seconds_to_formatted_time(t3-t2)}")
    print(f"Time to compare all pairs: {seconds_to_formatted_time(t4-t3)}")
    print(f"Time to get names of samples: {seconds_to_formatted_time(t5-t4)}")
    print(f"Time to create dataframe: {seconds_to_formatted_time(t6-t5)}")
    return df

You can see it used with some benchmarks here. Without memory mapping, the process takes 4x longer than the serial version?!??! 😱.

compare

The keys were memory-mapping the signatures, using threading backend and ensuring sharedmem in the Parallel call.

ctb commented 5 years ago

See https://github.com/dib-lab/sourmash/pull/709 for compare, and see https://github.com/ctb/2019-tara-binning2/blob/master/Snakefile for a snakemake-based way to parallelize compute.

matrs commented 4 years ago

I have one multi-fasta file with hundreds of sequences (whole mammal genome), the only way is to split it into multiple fasta files?

olgabot commented 4 years ago

@matrs yes, unfortunately compute currently only single-threads per fasta file. If you break up per chromosome, e.g. using seqtk you can use the Snakemake workflow above or this Nextflow workflow: https://github.com/nf-core/kmermaid/ (full disclosure: I wrote the nextflow version :)

ctb commented 1 year ago

https://github.com/sourmash-bio/pyo3_branchwater now provides multithreaded implementations of search and gather.