qiyunzhu / woltka

Woltka: a versatile meta'omic data classifier
BSD 3-Clause "New" or "Revised" License
71 stars 25 forks source link

Enable parallel computing #45

Open qiyunzhu opened 4 years ago

qiyunzhu commented 4 years ago

As my recent benchmarks (issues #40, #38) suggest, a notable proportion of Woltka's runtime are on the CPU / memory side, in addition to file I/O. @adswafford once asked if we can do parallel processing. @antgonza also proposed doing paralleling zipping (PR #42). Meanwhile @wasade reminded the challenges in Python's parallel processing. I think this is an open question and derves further exploration.

adswafford commented 4 years ago

I recall that back in the day @tanaes addressed this in oecophylla by making a mode where individual files were dispatched as job submissions. Could something similar be employed here since everything is per-file until the final biom table generation?

wasade commented 4 years ago

If woltka is sample independent, then this can be farmed out one task per job with a merge at the end

qiyunzhu commented 4 years ago

Hi @wasade @adswafford This is correct. Woltka can totally be run separately on multiple subsets of samples and the resulting BIOM tables can be merged. No modification is needed for the current codebase.

adswafford commented 4 years ago

So the lightest implementation then would be to write the file list to an array and then parse it with an array job and do a final call to merge the biom tables in the output directory?

Is there a more efficient way to construct and read the file list to support this within the woltka codebase or as an auxilary code snippet?? I've done this before for large multiplexed files, but it was a manual process to create the filepath array in my shell script.

On Tue, Apr 21, 2020 at 1:28 PM Qiyun Zhu notifications@github.com wrote:

Hi @wasade [github.com] https://urldefense.com/v3/__https://github.com/wasade__;!!Mih3wA!Ut6cH5c2pvqsmjACsR68uQYnEj5ifTf6A5gJiC8_dcH5SX0rReti5QiFp6yzLtTeTw$ @adswafford [github.com] https://urldefense.com/v3/__https://github.com/adswafford__;!!Mih3wA!Ut6cH5c2pvqsmjACsR68uQYnEj5ifTf6A5gJiC8_dcH5SX0rReti5QiFp6xMBkm0QQ$ This is correct. Woltka can totally be run separately on multiple subsets of samples and the resulting BIOM tables can be merged. No modification is needed for the current codebase.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub [github.com] https://urldefense.com/v3/__https://github.com/qiyunzhu/woltka/issues/45*issuecomment-617397269__;Iw!!Mih3wA!Ut6cH5c2pvqsmjACsR68uQYnEj5ifTf6A5gJiC8_dcH5SX0rReti5QiFp6y-ZD6koA$, or unsubscribe [github.com] https://urldefense.com/v3/__https://github.com/notifications/unsubscribe-auth/AGOEDBV6VNTQPUKTUL5O7NTRNX6YNANCNFSM4MNQLX6Q__;!!Mih3wA!Ut6cH5c2pvqsmjACsR68uQYnEj5ifTf6A5gJiC8_dcH5SX0rReti5QiFp6wZlh7ZZw$ .

wasade commented 4 years ago

A workflow manager like snakemake or nextflow which abstract out the resource manager may work. I do not recommend doing something homebrew that is over fit to Torque/PBS. Distributed compute is hard, and the alternative of just having the user run a few commands (e.g., one to make the script, one to merge the results) will greatly reduce developer time.

qiyunzhu commented 4 years ago

I think a few commands (like Qiita does on SHOGUN) or a workflow manager (like Oecophylla does on SHOGUN) both work. Although in some use cases, especially when the input file is a multiplexed alignment file, as Austin brought up, it will be more favorable to have parallel computing built-in in Woltka. This would also save the cost of reading and storing taxonomy database in memory. Doing parallel processing in Python is not extremely hard, as I once did in this notebook. Here is a simple example:

import multiprocessing as mp

pool = mp.Pool(8)
res = []

i = 0
cache = []
for line in alignment_file:
    cache.append(line)
    i += 1
    if i == 1000000:
        res.append(pool.apply_async(classify_function, data=cache, **kwargs))
        i = 0
pool.close()
pool.join()

table = []
for x in res:
    merge_result(x.get(), table)
return table
wasade commented 4 years ago

That example is not assured to avoid memory replication. It does circumvent the GIL so multiple processors will be used.

Parallel is never easy. Simple examples work, but rarely do we do simple things. Extending this to real problems gets complex as programs are a mix of parallel and serial components, not all work is balanced either due to task size or system variability, interprocess communication is very hard, non-uniform memory access patterns on multi-socket systems present difficult to track performance issues, cache poisoning reducing expected performance, the complexity of debugging, etc.

I strongly advise only considering coupled parallel compute after the code has been thoroughly profiled and alternatives considered. A O(n^2) algorithm or implementation that is parallelizable will run slower and waste substantially more resources than a O(log(n)) serial version. Similarly, if there is a critical bottleneck that consumes 90+% of the runtime, cutting that out and optimizing it with Cython or numba, or other low level approaches is a good next step.

If it really needs to be parallelized, then just batching out a bunch of system calls to the cluster is much easier than coupled compute. If that's intractable, then using something like joblib, Dask, or snakemake, or nextflow probably will get you a long way.

qiyunzhu commented 4 years ago

@wasade Thanks for explaining the challenges and risks of doing parallel compute! Much of it is beyond my current knownledge but it is good learning experience to me.

The parallel compute I envisioned is in such mechanism: each thread reads from a shared large variable (the classification system), and it writes to a separate variable (classification result of each chunk). Eventually, all results get merged into one new variable (profile). Therefore I think this mechanism is not coupled, and risk is relatively low.

That being said, implementing non-simple things is plausible in future development. For example, I am planning on implementing a "cache" (issue #41), which may induce coupling if doing parallel compute. I agree with parallel compute is a risky zone and needs to be considered carefully only after serial version is perfectized.

wasade commented 4 years ago

If parallel is being explored within an app, such as using threads or forking processes, then it is coupled. If it is external to the app, like a bunch of system calls, then it may not be coupled (or at least a lot less). The degree of coupling can be low particularly where all variables / resources are completely independent, without any need for semaphores or mutexes, to extremely tight systems that require low latency synchronization across distributed systems. (physics modeling for example where 1000s or computers are cooperating).

The scenario described is definitely coupled. Multiple threads or processes are reading from the same memory. If on a multi-socket system, not all threads or processes may have the same level of bandwidth to that memory. It's not obvious how the shared memory is setup, and the types of constraints it has, and how those constraints impact performance of this particular problem across operating systems or architectures -- OSX is not Linux, Intel is not AMD. If this is IO or memory bound, increasing the number of processes could hurt runtime. And access patterns could easily trash cache lines which could degrade performance in unexpected ways.

I'm not saying don't do it, just advising to delay pursuit until after easier options have been explored.

Has woltka been profiled with cProfile yet, and can the profiles be shared? And, to check, have scaling assessments been done over increasing numbers of sequences / samples to explore empirical growth in runtime and space requirements?

qiyunzhu commented 4 years ago

Hi @wasade Thanks for explaining to me the knowledge! I just had time to seriously learn Python profiling after this long while. Made some progress, and working on bottlenecking steps accordingly. See #50 .

mortonjt commented 3 years ago

Agree with @wasade on profiling. Regarding farming out jobs, it maybe worth looking into dask_jobsqueue since it supports multiple cluster configurations, including Torque and Slurm. This works best for embarrassingly parallel workflows. It will certainly be an involved process, but if you can just farm out jobs on a per-sample basis, it maybe worth it.

mortonjt commented 2 years ago

Is there any traction on this? Running woltka singly threaded is quite time consuming -- if either multi-processing is integrated or a tutorial on how to run per-sample classification is made (to let users do their own parallelization), that would be tremendously helpful

antgonza commented 2 years ago

In case it helps, the Qiita's qp-woltka plugin does per-sample parallelization (each sample runs with 8 workers). However, this is extremely tight with the scheduler environment we use.

qiyunzhu commented 2 years ago

@mortonjt @antgonza I do think this is an important feature to have. So far Woltka doesn't have parallelization built-in. I work-around (probably similar to what Antonio used) is to run woltka classify on individual (groups of) alignment files to generate multiple feature tables, then run woltka tools merge to merge the tables. Here is a tutorial.

mortonjt commented 2 years ago

That tutorial is exactly what I am looking for. Thanks! Feel free to close this issue.