JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
23 stars 9 forks source link

Error with remotecall_fetch called on locally defined closures #40

Open EthanAnderes opened 7 years ago

EthanAnderes commented 7 years ago

Ref: discourse link

In cases when parallel data movement is a bottle neck, it would useful to be able to use closures with pre-computed internal state parameters that are computed on each worker only once and not serialized from the master node.

Here are two examples. The first one is a simple case where gen_foo returns a closure generated on all workers. However remotecall_fetch gives an error. @amitmurthy gave a workaround in the link above but it would be nice to have a more direct approach.

The second example tries to use planned FFT's as internal states in the closures.

First example

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere function gen_foo(local_state)
           foo(x) = x * sum(local_state)
           return foo::Function
       end

julia> @everywhere foo = gen_foo(rand(100,100))

julia> remotecall_fetch(foo, 1, 10) #<-- works
50008.9259374688

julia> remotecall_fetch(foo, 2, 10) #<-- UndefVarError: #foo#9 not defined
ERROR: On worker 2:
UndefVarError: #foo#9 not defined
deserialize_datatype at ./serialize.jl:968
handle_deserialize at ./serialize.jl:674
deserialize at ./serialize.jl:634
handle_deserialize at ./serialize.jl:681
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
JuliaLang/julia#99 at ./event.jl:73
Stacktrace:
 [1] #remotecall_fetch#141(::Array{Any,1}, ::Function, ::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:354
 [2] remotecall_fetch(::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:346
 [3] #remotecall_fetch#144(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367
 [4] remotecall_fetch(::Function, ::Int64, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:367

Second example

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere function gen_bar(local_state, Δx, n)
           FFT = Δx / (2π) * plan_rfft(rand(n); flags=FFTW.PATIENT, timelimit=4)
           function bar(x)
               y = FFT * (x .* local_state)
               return y[1]
           end
           return bar::Function
       end

julia> @everywhere Δx, n = 0.1, 1000

julia> @everywhere bar = gen_bar(rand(n), Δx, n)

julia> x = rand(n);

julia> @everywhere x=$x

julia> @everywhere println(bar(x))    # <-- works
3.9819631247716307 + 0.0im
    From worker 2:  3.8079619702395155 + 0.0im
       From worker 3:   4.008786805675184 + 0.0im

julia> remotecall_fetch(bar, 1, x)   # <-- works
3.9819631247716307 + 0.0im

However I get UndefVarError: #bar#9 not defined for these

remotecall_fetch(bar, 2, x) # <-- UndefVarError: #bar#9 not defined

out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
    bar(x)
end

out = @parallel (vcat) for i = 1:nprocs() # <-- UndefVarError: #bar#9 not defined
        remotecall_fetch(bar, i, x)
end
amitmurthy commented 7 years ago

For this particular use case, remote references can be used to store locally defined closures on each node and fetch it locally before execution. Something like:

# create
foo_refs = Dict()
for p in procs()
  foo_refs[p] = RemoteChannel(p)
end 

# initialize on all nodes
for p in procs()
  remote_do(x->put!(x, gen_foo(rand(100,100))), p, foo_refs[p])
end

# access on worker 2
remotecall_fetch(2, foo_refs[2], 10) do rr, v
  fetch(rr)(v)
end

You can use https://github.com/JuliaParallel/DistributedArrays.jl#working-with-distributed-non-array-data-requires-julia-06 in DistributedArrays to do the same more efficiently and in fewer lines of code.

An issue I see is the difference in the way we treat a global function definition vs closures bound to global variables.

With julia -p 4

julia> @everywhere foov = rand()

julia> @everywhere func_foo() = foov

julia> @everywhere glb_var_foo = ()->foov

julia> [remotecall_fetch(func_foo, p) for p in procs()]
5-element Array{Float64,1}:
 0.0138725
 0.451707 
 0.41358  
 0.36171  
 0.331541 

julia> 

julia> [remotecall_fetch(glb_var_foo, p) for p in procs()]
5-element Array{Float64,1}:
 0.0138725
 0.0138725
 0.0138725
 0.0138725
 0.0138725

In the second case it is actually the closure defined on node 1 that is serialized and executed on the remote node while one may intuitively expect it to be the locally defined glb_var_foo.

At this time I think we just need to better highlight this difference and add examples to the documentation.

amitmurthy commented 7 years ago

The original issue is also related to gen_foo defining a local foo which is then bound to a global variable foo. Consider the following 2 cases:

The first one errors as in the original issue:

julia> gen_foo() = (foo() = 1; foo)
gen_foo (generic function with 1 method)

julia> foo = gen_foo()
foo (generic function with 1 method)

julia> remotecall_fetch(foo, 2)
ERROR: On worker 2:
UndefVarError: #foo#1 not defined
deserialize_datatype at ./serialize.jl:975

However, if we change gen_foo to now return a locally defined bar, it works.

julia> gen_foo() = (bar() = 1; bar)
gen_foo (generic function with 1 method)

julia> foo = gen_foo()
(::bar) (generic function with 1 method)

julia> remotecall_fetch(foo, 2)
1

How is the name of the locally defined function affecting the way in which global variable foo is serialized? @vtjnash / @JeffBezanson ? Anyone?

StefanKarpinski commented 7 years ago

Bump...

magerton commented 6 years ago

Hi -- Thanks for the helpful work-around. I'd also much appreciate the ability to call remote-specific functions. I'm currently computing a likelihood in parallel and am using sets of pre-allocated temporary variables on each worker. Using @everywhere lets me get part of the way there. My work-around to call remote-specific closures extends your code, @amitmurthy.

# to zero-out all tmpvars
@everywhere zero_tmp() = zero!(remote_tmp::TmpVar)

function call_remote_fun(f::Symbol)
    for p in workers()
        remotecall_fetch(() -> eval(Main, :($(f)())), p)
    end
end

call_remote_fun(:zero_tmp)

# to sum over remote tmpvars
@everywhere remote_f() = get_from_remote_tmpvars(remote_tmp::TmpVar)

function mapreduce_remotevars(f::Symbol, R::Function)
    mapreduce((p::Int) -> remotecall_fetch(() -> eval(Main, :($(f)())), p), R, workers() )
end

mapreduce_remotevars(:remote_f, +)