JuliaLang / julia

The Julia Programming Language
https://julialang.org/
MIT License
45.46k stars 5.46k forks source link

functions are serialised at every batch with `pmap` and `pgenerate` #16345

Closed oxinabox closed 8 years ago

oxinabox commented 8 years ago

So I was trying to workout why my parallel code was taking so long. After-all I only sent the big datastructures once, though a closure as the function that was mapped over. That should happen, once (I thought), since the function is constant Not so

MWE:

addprocs(5)

immutable Foo
    bar::Char
end

#HACK: lets debug what is being serialised by overloading the calls
function Base.serialize(s::Base.SerializationState, x::Foo)
    tic()
    Base.Serializer.serialize_any(s,x)
    tt=toq()
    open("ser_log.txt","a") do fp
        println(fp, tt)
    end
end

function test()
    st = Foo(rand('a':'z')) 
    pmap(r->string(st.bar)^r, 1:100)
    #Base.pgenerate(default_worker_pool(), r->string(st.bar)^r, 1:100) |> collect
end

Then running the function and counting the lines in the log:

test()
run(`wc -l ser_log.txt`)
OUT> 17

So it was serialized 17 times for pmap. If is switch to pgenerate is it 18 times (so about the same). I believe that after the batchsplit step is done that is once serialisation of the closure, per batch that was sent. It only need to be serialized once.

(in my nonMWE, it is happening millions of times, and takes 6 seconds a piece...)


I suspect this is already known, but I can't find an issue for it, so maybe not.

see also:


versioninfo()
Julia Version 0.5.0-dev+3928
Commit fdc4a85 (2016-05-06 04:46 UTC)
Platform Info:
  System: Linux (x86_64-linux-gnu)
  CPU: AMD Opteron 63xx class CPU
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Piledriver)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.7.1 (ORCJIT, bdver2
oxinabox commented 8 years ago

Work Arounds

So goal here is to cut the serialisation down to once per worker. Ideally it would be once ever, but that requires a delving down deeper.

Presending (Does Not Work, and IDK why)

Idea is we should first send the function into remote Futures. Then replace the function we send with pgenerate with a lightweight function that does a lookup for the right (local) Future. Instead of each time sending the orginal function (which could be a closure haulling tons of data). Just send a call to fetch the function.

I really thought this would work, but it does not. And that might be a bug in and of itself; or the cause of the bug reported above.

function _pgenerate(f,c)
    worker_ids = workers()
    r_fs = Dict{Int64,Base.AbstractRemoteRef}()
    for id in worker_ids
        r_fs[id]=Future(id)
        put!(r_fs[id], f)                
    end

    worker_pool = WorkerPool(worker_ids)

    #Give everyone the dict of functions on each worker
    #get them to retieve theirs
    #and execute it    
    Base.pgenerate(worker_pool, x->fetch(r_fs[myid()])(x), c)  
end

Does not help. Infact makes it worse, cos one extra call initially, per worker.

Global Variable Hack (Works, but leaks memory and is and scary)

This is kinda scary. Instead of sending the function intailly, in a remote Future, and then fetching it. Send a command to create a global variable with the function it it. Then just use the function that is now declared remotely.

function _pgenerate_gh(f,c, mname=Symbol("_func_genmagic_hack"*string(rand(1:1024)))::Symbol)
    #Reusing the `mname` in subsequent called can A.) Reclaim memory, B.) Violate certain concurrency expectations
    worker_ids = workers()
    for id in worker_ids
        remotecall_wait(id, mname,f) do mname_i, f_i
            eval(Expr(:global, Expr(Symbol("="), mname_i, f_i)))
        end
    end

    worker_pool = WorkerPool(worker_ids)

    #Give send a function telling them to look up the function locally
    Base.pgenerate(worker_pool, x->eval(mname)(x), c)  
end

I feel unclean.

amitmurthy commented 8 years ago

For this particular scenario we could support a batch_size=:max in https://github.com/JuliaLang/julia/pull/15975 which would result in only a single call to each worker.

Other alternatives (along the lines you are exploring)

cc: @samoconnor

oxinabox commented 8 years ago

A very different work around is what I thought was a good idea a couple of days ago. Before I found pgenerate

http://codereview.stackexchange.com/questions/128058/using-remotechannels-for-an-parallel-iterable-map (That code has problems, like lost of small sends)

The idea being that when you start you create on each worker a task that infinite loops, while reading one channel and writing to another. The infinite loop ends when a channel is closed. Then the constant data portion - i.e. the expensive closure is only serialised in the first step when the loops on the workers are created.


Problem with batch_size=:max is that it assumed you can hold the whole Iterable you are running over in memory. Or at least a very substantial portion of it. And that is not generally true. (Although as I have just found out. I personally can afford to whole the whole Gigaword Corpus in RAM. Most people can't.)

JeffBezanson commented 8 years ago

We should probably remember which functions we have sent to which workers and automatically avoid duplicating.

amitmurthy commented 8 years ago

Not a bad idea. The mapping function is typically an anonymous function. pmap should cache it on each worker only for the duration of the call. Will need to store in a remote reference and manage its lifetime efficiently.

JeffBezanson commented 8 years ago

Thinking in more detail, closures might be a separate issue. Internal to the serializer, we could avoid re-sending the code for functions, which is not very big but is slow to serialize. Avoiding re-sending closure data is harder, since it can be arbitrarily big and will need a cache management strategy as you point out.

oxinabox commented 8 years ago

So I improved my hack around the issue, to actually only serialize once. By doing the serialization entirely outside the remote_calls, and just sending the serialised data. It is annoying that the serialization code is stateful (rather than pure), which makes it hard to logic around -- I was trying to do this at a lower, more general level.

This, as with the last, is twice as fast as pmap, in the above test. I can't using the MWE above detect a significant difference. But I suspect that when it comes to my actual code. Which hauls >10MB of data around within the closures, and has to send to 32 workers, this will be pretty huge.

function _pgenerate_gh_sc(f,c, mname=Symbol("_func_genmagic_hack"*string(rand(UInt64)))::Symbol)
    #Reusing the `mname` in subsequent called can A.) Reclaim memory, B.) Violate certain concurrency expectations
    worker_ids = workers()

    s_f_buff = IOBuffer()
    serialize(s_f_buff, f)
    s_f = s_f_buff.data

    function make_global(mname_i, s_f_i)
        eval(Expr(:global, 
                    Expr(Symbol("="), mname_i, 
                        Expr(:call,:deserialize,
                            Expr(:call, :IOBuffer, s_f_i)
                            )
                        )
                 )
            )
    end

    @sync for id in worker_ids
        @async remotecall_wait(make_global, id, mname, s_f)
    end

    worker_pool = WorkerPool(worker_ids)

    #Give send a function telling them to look up the function locally
    Base.pgenerate(worker_pool, x->eval(mname)(x), c)  
end
amitmurthy commented 8 years ago

closed by https://github.com/JuliaLang/julia/pull/16808