JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
23 stars 9 forks source link

`@distributed` fails on loops over iterators #57

Open dangirsh opened 5 years ago

dangirsh commented 5 years ago

Distributed for loops over iterators fail, seemingly because @distributed expects the iterator to implement getindex. It makes sense that @distribute needs getindex, but I don't see that in the docs. The possible solutions seem to be:

  1. Update the docs for @distributed to mention this requirement.
  2. Update the implementation of @distributed to work for iterators.

We could do JuliaLang/julia#1 until JuliaLang/julia#2 is finished.

Here's an example:

@distributed (*) for (i, j) in Base.Iterators.product([1, 2], [3, 4])
    i + j
end
ERROR: LoadError: MethodError: no method matching getindex(::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64)
(::getfield(Main, Symbol("##7#8")))(::typeof(+), ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}, ::Int64, ::Int64) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:275
(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:339
run_work_thunk(::getfield(Distributed, Symbol("##143#144")){getfield(Main, Symbol("##7#8")),Tuple{typeof(+),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Int64,Int64},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch(::Function, ::Distributed.LocalProcess, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Function, ::Vararg{Any,N} where N) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
(::getfield(Distributed, Symbol("##167#168")){typeof(+),getfield(Main, Symbol("##7#8")),Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}},Array{UnitRange{Int64},1},Int64,Int64})() at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:259
Stacktrace:
 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] wait(::Condition) at ./event.jl:46
 [4] wait(::Task) at ./task.jl:188
 [5] fetch at ./task.jl:202 [inlined]
 [6] iterate at ./generator.jl:47 [inlined]
 [7] collect(::Base.Generator{Array{Task,1},typeof(fetch)}) at ./array.jl:619
 [8] preduce(::Function, ::Function, ::Base.Iterators.ProductIterator{Tuple{Array{Int64,1},Array{Int64,1}}}) at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:263
 [9] top-level scope at /home/dan/julia/usr/share/julia/stdlib/v1.0/Distributed/src/macros.jl:274
 [10] include at ./boot.jl:317 [inlined]
 [11] include_relative(::Module, ::String) at ./loading.jl:1044
 [12] include(::Module, ::String) at ./sysimg.jl:29
 [13] include(::String) at ./client.jl:392
 [14] top-level scope at none:0
in expression starting at /home/dan/julia/test.jl:3

This is surprising, since the following non-parallel reduction works:

julia> reduce((*), [i + j for (i, j) = Base.Iterators.product([1, 2], [3, 4])])  
600

Adding a collect to the distributed version fixes it:

@distributed (*) for (i, j) in collect(Base.Iterators.product([1, 2], [3, 4]))
    i + j
end

Output: 600

Julia Version 1.0.2
Commit d789231e99* (2018-11-08 20:11 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.0 (ORCJIT, skylake)
Environment:
  JULIA_REVISE_INCLUDE = 1
  JULIA_DEBUG = all
sunnsi commented 5 years ago

Does any one tell me whether the macro "@distributed" work on iterators now? I have a very large data array (more than 900,000,000) after combinations, collect() function will exhaust all memory.

mbauman commented 5 years ago

Yes, @distributed still requires indexing — it'd indeed be good to document this requirement.

Note that distributing such a large array to your workers may swamp the computation time with communication overhead. You may want to consider "peeling off" one of your combinatorial elements, distribute that amongst your workers, and then have each of your workers compute the combinations of that one element with the remainder.

sunnsi commented 5 years ago

Thanks, mbauman. I will try your suggestions.

bocc commented 4 years ago

Hello Matt,

if I understand correctly, this indexing requirement is the reason behind eachrow & eachcol not working with @distributed? Julia reports a syntax error saying 'invalid assignment location' in 1.2.

mbauman commented 4 years ago

Yes, that's correct. Just as a breadcrumb, with https://github.com/JuliaLang/julia/pull/32310 we could possibly add indexing support.