Open VLucet opened 4 years ago
Use Julia's parallelization with pmap
. You may need to familiarize yourself with the different options etc. and write a script.
A @distributed
or @threads
for loop might also be a viable option, though not sure if @threads
would be optimal for this.
@ViralBShah @vlandau I appreciate the quick reply. I will make an attempt and reopen if I run into issues.
I'll be curious to know how both options fair. Remember to set number of blas threads to 1.
Remember to set number of blas threads to 1.
Good call. I do the same thing in Omniscape, and it helps with performance.
This might be a little over my head. Could you explain further what that means and how to do it?
how to do it?
BLAS is "basic linear algebra subprograms", and they do linear algebra in parallel using multithreading. If you are doing higher level parallelization, where each job is itself using BLAS, then you can get "oversubscription," which is basically having more work queued up to do than you have hardware resources. This causes a competition for resources that can have consequences for performance. Setting BLAS number of threads to 1 prevents this from happening. The link in my comment above shows the code to do that.
Reopening this issue as I am having some troubles... I read the doc on parallel processing. From my understanding of it, I did this:
julia -p 2
for two cores for instance.
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using LinearAlgebra.BLAS @everywhere BLAS.set_num_threads(1)
@everywhere searchdir(path,key) = filter(x->occursin(key,x), readdir(path)) @everywhere dir = "config/ini_circuitscape/all/" @everywhere ext = "ini" @everywhere ini_list = dir .* searchdir(dir, ext)
@everywhere using Circuitscape
@everywhere function comp(file) println(file) # print the name of the ini file compute(file) # compute flow end
pmap(comp, ini_list)
This code lauches multiple processes, equal to `-p`. The `compute function` gets as far as:
[ Info: 2020-06-09 21:05:07 : Time taken to construct local nodemap = 0.353301802 seconds
Then, the function get hang (`htop` seems to show no activity in the processes). I did some digging and I think the function gets stuck at [this line](https://github.com/Circuitscape/Circuitscape.jl/blob/71167a28fae7cba9e0f8250fb57942db11b94500/src/core.jl#L221):
X = pmap(x ->f(x), 1:size(csub,1))
**Changing this `pmap` call to `map` fixes the problem:** the code keeps running and runs to completion, with processes cycling thought the list of ini files.
Am I doing something wrong?
FWIW, I also tried :
@Threads.threads for file in ini_list compute(file) end
and **do not** get the same behavior, only one process starts before getting hung. Also, the following for loop does work fine, so it is not an issue with my ini files:
for file in ini_list comp(file) end
Ah - The parallelization inside circuitscape is interfering with the parallelization that you want to do. pmap
does not support parallel jobs running other parallel jobs (and it would greatly over-subscribe if it did).
The right way to address this is with multi-threading which is composable - but we are having some issues there at the moment (#230). For now, changing the pmap
to map
is the right thing. Can we check the appropriate INI option and have it call map in the seq case and pmap in the parallel? It would be good to have a PR for that if you are comfortable making one.
I am not sure - but I also believe that if you start Julia with one processor, and literally just do addprocs(2)
before your pmap
call, you may be able to avoid a lot of the use of @everywhere
.
Can we check the appropriate INI option and have it call map in the seq case and pmap in the parallel? It would be good to have a PR for that if you are comfortable making one.
Would love to contribute but unsure what you mean by the "appropriate INI option". I was thinking of something of that type:
if nworkers() == 1
X = pmap(x ->f(x), 1:size(csub,1))
elseif nworkers() > 1
X = map(x ->f(x), 1:size(csub,1))
I am not sure - but I also believe that if you start Julia with one processor, and literally just do addprocs(2) before your pmap call, you may be able to avoid a lot of the use of @everywhere.
Thanks, will give it a try.
I think we can check the configuration parameter parallelize
where the user tells us whether they want to use Circuitscape in parallel. The challenge is that the current Circuitscape package is a bit of a mishmash - trying to be a library for other tools to use, as well as trying to be an application itself.
The challenge is that the current Circuitscape package is a bit of a mishmash - trying to be a library for other tools to use, as well as trying to be an application itself.
I see, but I cant see how to use parallelize
without changing its default value, which could break things.
I see this setting is used here to add workers that will then be used by pmap
.
https://github.com/Circuitscape/Circuitscape.jl/blob/c525971314addd7ec3064caecdae663e1dd1f75f/src/run.jl#L67-L72
So the parallelize
parameter is for parallelizing at the level of the pairs, not the higher level. It is false by default.
Does that mean we need to default parallelize
to true (and max_parallel
to 1 as default) and therefore using pmap
by default? Then modifying core.jl
so that: *
is_parallel = cfg["parallelize"] in TRUELIST
if is_parallel
X = pmap(x ->f(x), 1:size(csub,1))
else
X = map(x ->f(x), 1:size(csub,1))
If a user wants to paralellize at a higher level than the pairs, thwy would have to set parallelize
to false.
* Adding a note that this is for the amg solver, but a similar change would need to be made for cholmod.
I think what @ViralBShah might have been getting at is that if parallelize
in Circuitscape (internally) is false
, then we should use map
in favor of pmap
, which is just what your code does above. In the default case, parallelize
(and is_parallel
) would be false
, so Circuitscape could use map
internally. In other words there should be no need to set parallelize
to true
by default.
Ok, sorry for the confusion. Deleted a comment because I got myself confused. I think I get it now, no need to change the default value for parallelize
.
@Vlucet would you mind putting a final comment on this thread with some example code for your parallel processing approach now that it's working?
EDIT: I know you covered it above but was just thinking appending the pmap example at the bottom of this thread would be helpful for future users.
Ideally we would have it go into the documentation - but that needs #242. Until then this will have to suffice.
@vlandau I was going to test it further and do what you suggested and ran into #244
the following script launch with julia -p 2
now works:
@everywhere using Pkg
@everywhere using Distributed
@everywhere using LinearAlgebra.BLAS
# Set BLAS threads to 1 to avoid oversubscription
@everywhere BLAS.set_num_threads(1)
@everywhere Pkg.activate(".")
@everywhere using Circuitscape
# Read the ini files
@everywhere searchdir(path,key) = filter(x->occursin(key,x), readdir(path))
@everywhere dir = "config/ini_circuitscape/all/"
@everywhere ext = "ini"
@everywhere ini_list = dir .* searchdir(dir, ext)
pmap(compute, ini_list)
Im running into issues trying to get rid of the @everywhere and using addprocs(2)
. Each worker completes one call to compute
then I run in the following error. Happy to post another issue or reopen this.
ERROR: LoadError: On worker 3:
KeyError: key Circuitscape [2b7a1792-8151-5239-925d-e2b8fdfa3201] not found
getindex at ./dict.jl:477 [inlined]
root_module at ./loading.jl:962 [inlined]
deserialize_module at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:894
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:799
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722
deserialize_datatype at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:1192
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:775
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:782
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722 [inlined]
deserialize_msg at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/messages.jl:99
#invokelatest#1 at ./essentials.jl:709 [inlined]
invokelatest at ./essentials.jl:708 [inlined]
message_handler_loop at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:185
process_tcp_streams at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:142
#101 at ./task.jl:333
Stacktrace:
[1] (::Base.var"#732#734")(::Task) at ./asyncmap.jl:178
[2] foreach(::Base.var"#732#734", ::Array{Any,1}) at ./abstractarray.jl:1920
[3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{String,1}) at ./asyncmap.jl:178
[4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Distributed.var"#208#211"{WorkerPool}, ::Function, ::Array{String,1}) at ./asyncmap.jl:154
[5] #async_usemap#717(::Function, ::Nothing, ::typeof(Base.async_usemap), ::Distributed.var"#192#194"{Distributed.var"#192#193#195"{WorkerPool,typeof(compute)}}, ::Array{String,1}) at ./asyncmap.jl:103
[6] (::Base.var"#kw##async_usemap")(::NamedTuple{(:ntasks, :batch_size),Tuple{Distributed.var"#208#211"{WorkerPool},Nothing}}, ::typeof(Base.async_usemap), ::Function, ::Array{String,1}) at ./none:0
[7] #asyncmap#716 at ./asyncmap.jl:81 [inlined]
[8] #asyncmap at ./none:0 [inlined]
[9] #pmap#207(::Bool, ::Int64, ::Nothing, ::Array{Any,1}, ::Nothing, ::typeof(pmap), ::Function, ::WorkerPool, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:126
[10] pmap(::Function, ::WorkerPool, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:101
[11] #pmap#217(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(pmap), ::Function, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:156
[12] pmap(::Function, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:156
[13] top-level scope at /home/vlucet/Documents/Master/Thesis/land_con_monteregie/test.jl:18
[14] include at ./boot.jl:328 [inlined]
[15] include_relative(::Module, ::String) at ./loading.jl:1105
[16] include(::Module, ::String) at ./Base.jl:31
[17] exec_options(::Base.JLOptions) at ./client.jl:287
[18] _start() at ./client.jl:460
in expression starting at /home/vlucet/Documents/Master/Thesis/land_con_monteregie/test.jl:18
Make sure you run addprocs(2)
after running all of the other code (i.e. just before running pmap
). Let me know if that solves it.
It is what I was doing :)
full script called with julia -p 2
using Pkg
using Distributed
using LinearAlgebra.BLAS
# Set BLAS threads to 1 to avoid oversubscription
BLAS.set_num_threads(1)
Pkg.activate(".")
using Circuitscape
# Read the ini files
searchdir(path,key) = filter(x->occursin(key,x), readdir(path))
dir = "config/ini_circuitscape/all/"
ext = "ini"
ini_list = dir .* searchdir(dir, ext)
addprocs(2)
pmap(compute, ini_list)
Calling without -p 2
(just julia
) causes another issue:
ERROR: LoadError: On worker 2:
KeyError: key Circuitscape [2b7a1792-8151-5239-925d-e2b8fdfa3201] not found
getindex at ./dict.jl:477 [inlined]
root_module at ./loading.jl:962 [inlined]
deserialize_module at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:894
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:799
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722
deserialize_datatype at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:1192
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:775
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722
handle_deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:782
deserialize at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:722 [inlined]
deserialize_msg at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/messages.jl:99
#invokelatest#1 at ./essentials.jl:709 [inlined]
invokelatest at ./essentials.jl:708 [inlined]
message_handler_loop at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:185
process_tcp_streams at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:142
#101 at ./task.jl:333
Stacktrace:
[1] (::Base.var"#732#734")(::Task) at ./asyncmap.jl:178
[2] foreach(::Base.var"#732#734", ::Array{Any,1}) at ./abstractarray.jl:1920
[3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{String,1}) at ./asyncmap.jl:178
[4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Distributed.var"#208#211"{WorkerPool}, ::Function, ::Array{String,1}) at ./asyncmap.jl:154
[5] #async_usemap#717(::Function, ::Nothing, ::typeof(Base.async_usemap), ::Distributed.var"#192#194"{Distributed.var"#192#193#195"{WorkerPool,typeof(compute)}}, ::Array{String,1}) at ./asyncmap.jl:103
[6] (::Base.var"#kw##async_usemap")(::NamedTuple{(:ntasks, :batch_size),Tuple{Distributed.var"#208#211"{WorkerPool},Nothing}}, ::typeof(Base.async_usemap), ::Function, ::Array{String,1}) at ./none:0
[7] #asyncmap#716 at ./asyncmap.jl:81 [inlined]
[8] #asyncmap at ./none:0 [inlined]
[9] #pmap#207(::Bool, ::Int64, ::Nothing, ::Array{Any,1}, ::Nothing, ::typeof(pmap), ::Function, ::WorkerPool, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:126
[10] pmap(::Function, ::WorkerPool, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:101
[11] #pmap#217(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(pmap), ::Function, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:156
[12] pmap(::Function, ::Array{String,1}) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.3/Distributed/src/pmap.jl:156
[13] top-level scope at /home/vlucet/Documents/Master/Thesis/land_con_monteregie/test.jl:18
[14] include at ./boot.jl:328 [inlined]
[15] include_relative(::Module, ::String) at ./loading.jl:1105
[16] include(::Module, ::String) at ./Base.jl:31
[17] exec_options(::Base.JLOptions) at ./client.jl:287
[18] _start() at ./client.jl:460
in expression starting at /home/vlucet/Documents/Master/Thesis/land_con_monteregie/test.jl:18
Can you paste your whole session?
@ViralBShah Im realising i'm on 1.3.1, but you just recently commited for 1.4 to be minimum requirement.
Can you paste your whole session?
Do you mean my Manifest and Project files?
Can you paste your whole session?
Do you mean my Manifest and Project files?
I meant the screendump of everything from the point you start julia. You could post it as a gist. That way we can see all the commands and also try reproduce. I suspect the fix should be straightforward.
@ViralBShah Im realising i'm on 1.3.1, but you just recently commited for 1.4 to be minimum requirement.
Yes, that is mainly because some of the dependent packages need Julia 1.4.
Alright, here is a screen dump, with fresh install of julia 1.4.2 and dev circuitscape. https://gist.github.com/VLucet/b472d55fe8202139daeedca33b734a6c
The addprocs(2)
should be done before the using Circuitscape
line. That will ensure that the packages are loaded on all processors.
@ViralBShah thank you for your continued help on this. I keep running into errors, as it looks like whenever I do addprocs()
, I still have to reload Pkg, I have to activate and instantiate and also load Circuitscape as well as declare all variables with @everywhere. This seems to be because, from the doc on addprocs, the different workers never synchronize.
help?> addprocs()
addprocs(; kwargs...) -> List of process identifiers
Equivalent to addprocs(Sys.CPU_THREADS; kwargs...)
Note that workers do not run a .julia/config/startup.jl startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules)
with any of the other running processes.
For example: https://gist.github.com/VLucet/509d269a86a28161b50ffe3818ebc5f1
Doesn't this just work for you? I'm doing this on Julia 1.4.2.
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
julia> using Circuitscape
@ViralBShah I am at a loss to understand what is going on, despite my reading of the doc of the Pkg
, but I think my issue is related to https://github.com/JuliaLang/julia/issues/28781.
Maybe there is something very basic that I do not understand, and to the risk of looking stupid, here is what I did:
I run julia in a container (fresh install of 1.4.2, nothing is installed yet at the beginning of my session). I activate and instantiate from a Manifest
that uses a specific sha of the Circuitscape repo (one of the recent commits). Here is the gist:
https://gist.github.com/VLucet/30eb4e82f342e7a0a210d7cff2b61680
What is odd is that using Circuitscape
only worked when I installed Circuitscape in the non-project environment (default environment), even if I installed a different version in the non-project (default) environment. This seems to be because workers inherit the default environment instead of the project environment, and will only not throw an error if the right packages are also present in the default environment, regardless of their versions.
And, as per suggested in the issue, I fixed my problem with addprocs(cores, exeflags="--project")
(it doesn't fail at using
), but it still fails when I try to use compute
on the workers.
@kristofferC - Is this diagnosis right about the error described above due to Pkg environments issue in https://github.com/JuliaLang/julia/issues/28781?
@VLucet Maybe you just have to throw @everywhere
on all the things for now.
Have you guys checked out this code on RA/metapar? It adds a flag to the INI which replaces the internal pmap
call with a map
and lets you call pmap
on a list of INI files. I'll make this a PR and @VLucet can test whether it works for him.
@ranjanan I will give it a try! If I understand I have to add meta_parallelize = True
to my INI files and then I have to call compute_metaparallel
on an array of file names?
EDIT: I see there is no need for the meta_parallelize
flag apparently
I see you are trying to remove @everywhere
-- that is a bad idea, that invocation is required.
The way this will work is that you start with multiple workers, julia --machinefile=...
(or julia -p ..
if its only one machine), and then on the master node, in your top level file, run something like this (simplifying)
using Distributed
@everywhere activate("/path/to/project")
@everywhere using Circuitscape
data = init_data() # gather problem data on master
pmap(Circuitscape.fn, data)
This code should probably be in a separate main or runner file, and not be part of the module. That makes things easier.
This seems to work for meta-parallelisation: using Pkg using Distributed
addprocs(3) @everywhere using Pkg @everywhere using Circuitscape
@everywhere searchdir(path,key) = filter(x->occursin(key,x), readdir(path)) @everywhere dir = "/circuitscape/05_current_connect_parts/04_ini_files/p1_pairs/" @everywhere ext = "ini" @everywhere ini_list = dir .* searchdir(dir, ext)
pmap(compute, ini_list, batch_size=3)
@frederikvand I do also seem to observe that although all the workers start, a lot of them seem to quickly die, or seem to alternate between in use and idle (looking at your edited message suggest you had some trouble with that too). Is batch_size
able to fix that?
Yeah I think its related to memory usage peaks. Its hard to notice with memory monitoring because the failing workers are caused by the peaks I think. When I alter batch size and use bigmemory nodes (264 gig ram) I it is solved.
I was wondering what would be the best way to paralellize Circuitscape, when the goal is not to solve multiple pairs per map (this is natively paralellized if I'm not mistaken), but when the goal is to solve many maps (one or two pairs per map). In my use case I have thousands of different INI files to map over.