JuliaParallel / DistributedArrays.jl

Distributed Arrays in Julia
Other
197 stars 35 forks source link

Performance issue in `DArray` creation when `myid()` holds a chunk #206

Open raminammour opened 5 years ago

raminammour commented 5 years ago

Hello,

If the pid trying to create a DArray has to do enough work, we see a slow down. The work interferes with the asynchronous dispatch of work to the other pids.

using Distributed
addprocs(2)
using DistributedArrays

# a lot of work...
@everywhere function init(I)
    rr=rand(map(length,I)...)
    ss=0.
    for _ in 1:10
        ss+=sum(rr.^2)
    end
    exp.(ss*exp.(rr).^3)
end

n=2000
@btime d1=DArray(init,(n,n),workers()[1:2])
@btime d1=DArray(init,(n,n),procs()[1:2])
@btime d1=DArray(init,(n,n),procs()[2:-1:1]);

  126.553 ms (290 allocations: 22.45 KiB)
  156.219 ms (253 allocations: 213.64 MiB)
  112.563 ms (249 allocations: 213.64 MiB)

Note that in the last experiment, the same pids are included, but the pid==1 is last and the slowdown disappears (it dispatches the work to others before doing its own). Which suggests the easy fix:

@everywhere @eval DistributedArrays function DistributedArrays.DArray(id, init, dims, pids, idxs, cuts)
    localtypes = Vector{DataType}(undef,length(pids))

    pids=copy(pids)
    ind=findfirst(isequal(myid()),pids)
    if ind != nothing
        pids[end],pids[ind]=pids[ind],pids[end]
    end

    @sync begin
        for i = 1:length(pids)
            @async begin
                local typA
                if isa(init, Function)
                        typA = remotecall_fetch(construct_localparts, pids[i], init, id, dims, pids, idxs, cuts)
                else
                    # constructing from an array of remote refs.
                    typA = remotecall_fetch(construct_localparts, pids[i], init[i], id, dims, pids, idxs, cuts)
                end
                localtypes[i] = typA
            end
        end
    end

    if length(unique(localtypes)) != 1
        @sync for p in pids
            @async remotecall_fetch(release_localpart, p, id)
        end
        throw(ErrorException("Constructed localparts have different `eltype`: $(localtypes)"))
    end
    A = first(localtypes)

    if myid() in pids
        d = registry[id]
        d = isa(d, WeakRef) ? d.value : d
    else
        T = eltype(A)
        N = length(dims)
        d = DArray{T,N,A}(id, dims, pids, idxs, cuts, empty_localpart(T,N,A))
    end
    d
end

And after the fix:

@btime d1=DArray(init,(n,n),workers()[1:2])
@btime d1=DArray(init,(n,n),procs()[1:2])
@btime d1=DArray(init,(n,n),procs()[2:-1:1]);

  79.150 ms (296 allocations: 22.64 KiB)
  111.481 ms (258 allocations: 213.64 MiB)
  111.870 ms (258 allocations: 213.64 MiB)

Will submit a PR with the fix promptly :)

Cheers!