ACEsuit / ACEfit.jl

Generic Codes for Fitting ACE models
MIT License
7 stars 6 forks source link

Problem with distributed assembly #59

Closed wcwitt closed 1 year ago

wcwitt commented 1 year ago

Reported by @CheukHinHoJerry in https://github.com/ACEsuit/ACE1x.jl/issues/7.

I got this error multiple times with the ACEfit.assemble function with multiple workers for large lsq system and I remember there was an issue about this so I think it's better to post it here. It happens when I am in the middle of assembling the design matrix. This is the full error log:

Worker 18 terminated.
Unhandled Task ERROR: EOFError: read end of file
Stacktrace:
 [1] (::Base.var"#wait_locked#715")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64)
   @ Base ./stream.jl:947
 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64)
   @ Base ./stream.jl:955
 [3] unsafe_read
   @ ./io.jl:761 [inlined]
 [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64)
   @ Base ./io.jl:760
 [5] read!
   @ ./io.jl:762 [inlined]
 [6] deserialize_hdr_raw
   @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/messages.jl:167 [inlined]
 [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
   @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:172
 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
   @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133
 [9] (::Distributed.var"#103#104"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})()
   @ Distributed ./task.jl:514
Progress:  21%|████████████████████████▌                                                                                           |  ETA: 0:52:08ERROR: Lo18Progress:  21%|████████████████████████▌                                                                                           |  ETA: 0:51:57)
Stacktrace:
  [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
    @ Base ./task.jl:920
  [2] wait()
    @ Base ./task.jl:984
  [3] wait(c::Base.GenericCondition{ReentrantLock}; first::Bool)
    @ Base ./condition.jl:130
  [4] wait
    @ ./condition.jl:125 [inlined]
  [5] take_buffered(c::Channel{Any})
    @ Base ./channels.jl:456
  [6] take!(c::Channel{Any})
    @ Base ./channels.jl:450
  [7] take!(::Distributed.RemoteValue)
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:726
  [8] remotecall_fetch(f::Function, w::Distributed.Worker, args::ACEfit.DataPacket{AtomsData}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:461
  [9] remotecall_fetch(f::Function, w::Distributed.Worker, args::ACEfit.DataPacket{AtomsData})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [10] #remotecall_fetch#162
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [11] remotecall_fetch(f::Function, id::Int64, args::ACEfit.DataPacket{AtomsData})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [12] remotecall_pool(rc_f::Function, f::Function, pool::WorkerPool, args::ACEfit.DataPacket{AtomsData}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:126
 [13] remotecall_pool
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:123 [inlined]
 [14] #remotecall_fetch#200
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:232 [inlined]
 [15] remotecall_fetch
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:232 [inlined]
 [16] #208#209
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:288 [inlined]
 [17] #208
    @ ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/workerpool.jl:288 [inlined]
 [18] (::Base.var"#978#983"{Distributed.var"#208#210"{Distributed.var"#208#209#211"{WorkerPool, ProgressMeter.var"#56#59"{RemoteChannel{Channel{Bool}}, ACEfit.var"#3#4"{JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedMatrix{Float64}}}}}})(r::Base.RefValue{Any}, args::Tuple{ACEfit.DataPacket{AtomsData}})
    @ Base ./asyncmap.jl:100
 [19] macro expansion
    @ ./asyncmap.jl:234 [inlined]
 [20] (::Base.var"#994#995"{Base.var"#978#983"{Distributed.var"#208#210"{Distributed.var"#208#209#211"{WorkerPool, ProgressMeter.var"#56#59"{RemoteChannel{Channel{Bool}}, ACEfit.var"#3#4"{JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedVector{Float64}, SharedArrays.SharedMatrix{Float64}}}}}}, Channel{Any}, Nothing})()
    @ Base ./task.jl:514
Stacktrace:
  [1] (::Base.var"#988#990")(x::Task)
    @ Base ./asyncmap.jl:177
  [2] foreach(f::Base.var"#988#990", itr::Vector{Any})
    @ Base ./abstractarray.jl:3073
  [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Base ./asyncmap.jl:177
  [4] wrap_n_exec_twice
    @ ./asyncmap.jl:153 [inlined]
  [5] #async_usemap#973
    @ ./asyncmap.jl:103 [inlined]
  [6] async_usemap
    @ ./asyncmap.jl:84 [inlined]
  [7] #asyncmap#972
    @ ./asyncmap.jl:81 [inlined]
  [8] asyncmap
    @ ./asyncmap.jl:80 [inlined]
  [9] pmap(f::Function, p::WorkerPool, c::Vector{ACEfit.DataPacket{AtomsData}}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:126
 [10] pmap(f::Function, p::WorkerPool, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:99
 [11] pmap(f::Function, c::Vector{ACEfit.DataPacket{AtomsData}}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:156
 [12] pmap(f::Function, c::Vector{ACEfit.DataPacket{AtomsData}})
    @ Distributed ~/julia_ws/julia-1.9.0/share/julia/stdlib/v1.9/Distributed/src/pmap.jl:156
 [13] macro expansion
    @ ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1015 [inlined]
 [14] macro expansion
    @ ./task.jl:476 [inlined]
 [15] macro expansion
    @ ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1014 [inlined]
 [16] macro expansion
    @ ./task.jl:476 [inlined]
 [17] progress_map(::Function, ::Vararg{Any}; mapfun::Function, progress::ProgressMeter.Progress, channel_bufflen::Int64, kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ProgressMeter ~/.julia/packages/ProgressMeter/sN2xr/src/ProgressMeter.jl:1007
 [18] assemble(data::Vector{AtomsData}, basis::JuLIP.MLIPs.IPSuperBasis{JuLIP.MLIPs.IPBasis})
    @ ACEfit ~/.julia/packages/ACEfit/ID48n/src/assemble.jl:31
 [19] make_train(model::ACE1x.ACE1Model)
    @ Main ~/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:54
 [20] top-level scope
    @ ~/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:91
 [21] include(fname::String)
    @ Base.MainInclude ./client.jl:478
 [22] top-level scope
    @ REPL[2]:1
in expression starting at /zfs/users/jerryho528/jerryho528/julia_ws/ACEworkflows/Fe_pure_jerry/asm_all_lsq.jl:71
  [e3f9bc04] ACE1 v0.11.12
  [8c4e8d19] ACE1pack v0.4.1
  [5cc4c08c] ACE1x v0.1.4
  [ad31a8ef] ACEfit v0.1.1
  [f67ccb44] HDF5 v0.16.15
  [682c06a0] JSON v0.21.4
  [898213cb] LowRankApprox v0.5.3
  [91a5bcdd] Plots v1.38.16
  [08abe8d2] PrettyTables v2.2.5
  [de0858da] Printf

Additionally:

It happens every time so it stops me from assembling a large lsq.

wcwitt commented 1 year ago

@CheukHinHoJerry, I'm happy to help figure this out. But if it's only a problem for large lsq, are you certain it's not running out of memory? That would be my first guess.

Edit: just searched your error message and found this, which confirms memory could be the issue: https://stackoverflow.com/questions/46515249/meaning-of-julia-error-error-unhandled-task-failure-eoferror-read-end-of-fi.

CheukHinHoJerry commented 1 year ago

Thank you for your prompted reply. The reason that why I guess it is not an OOM issue is that I checked with free_memory with:

julia> Sys.free_memory() / 2^20 / 1024
240.16326141357422

which has ~ 240 GB RAM on the server. Whereas my storage space has around 30GB available.

I am assembling a matrix of size (446365, 2773) which I expect to be ard of size 9.90 GB.

If there is a better way to check this could you please let me know? I would be thankful.

Update: I just kept printing Sys.free_memory() during assembling and it seems that it actually soon OOM. Is this something expected or some problem with gc?

wcwitt commented 1 year ago

Sorry - I didn't see your update before. I have been seeing this recently as well on large datasets, and I don't know what could have changed. For me, it helps to add GC.gc() after each subblock of the matrix is assembled - can you try that?

cortner commented 1 year ago

I added that the the mt assembly already and this seemed to solve my problem.

CheukHinHoJerry commented 1 year ago

Yes. I tried that too and it works fine.

wcwitt commented 1 year ago

Yes. I tried that too and it works fine.

Do you mean it works with the distributed now, if you garbage collect? Or just with the mt?

(I do plan to merge some form of mt, just trying to get to the bottom.)

CheukHinHoJerry commented 1 year ago

Sorry for the confusion. I tried that with mt only but didn't try with the multiprocess version.

wcwitt commented 1 year ago

Since you confirmed this was an OOM problem, and I remembered that I recently deleted a GC.gc() in 54b7b2ed0a5ccb66820a799e43353f5979d68cfc, I think we have the answer. I've now restored the garbage collection in 65d5bc0.

Okay to close? We'll handle the mt elsewhere.

CheukHinHoJerry commented 1 year ago

Sure - Thank you very much for your help!

wcwitt commented 1 year ago

I added that the the mt assembly already and this seemed to solve my problem.

@cortner when you said this, did you mean you added GC.gc() to the mt assembly? If so, where? I'm finding it has a huge effect on performance in the mt case.

cortner commented 1 year ago

weird, I guess I must have this locally somewhere and forgot to push it.

cortner commented 1 year ago

I think what I did is similar to what you did for distributed. Does this cause performance issues for distributed or for mt? (it didn't for mt I think...)

I don't really understand how it can slow down performance, maybe another question for @tjjarvinen ??

One possible solution might be to GC only every 10 iterations or so?

tjjarvinen commented 1 year ago

I presume we are now talking about this code block that reads

    A = SharedArray(zeros(rows[end][end], length(basis)))
    Y = SharedArray(zeros(size(A, 1)))
    W = SharedArray(zeros(size(A, 1)))
    @info "  - Beginning assembly with processor count:  $(nprocs())."
    @showprogress pmap(packets) do p
        A[p.rows, :] .= feature_matrix(p.data, basis)
        Y[p.rows] .= target_vector(p.data)
        W[p.rows] .= weight_vector(p.data)
        GC.gc()
    end

This is a parallel reduction. The idea on the code seems to be to define SharedArrays A, Y and W where reduction is performed. The issue is probably on how SharedArray handless random access from different processes. It probably creates multiple copies of itself which will lead to OOM-error.

Also, SharedArray is most likely not designed for reductions, so you could try something like this and see if it has the same problem

A = SharedArray(zeros(rows[end][end], length(basis)))
Y = SharedArray(zeros(size(A, 1)))
W = SharedArray(zeros(size(A, 1)))
assembly_parts = pmap(packets) do p
        A = feature_matrix(p.data, basis)
        Y = target_vector(p.data)
        W = weight_vector(p.data)
        ( :rows => p.row, :A => A, :Y => Y, :W => W )
end

for part in assembly_parts
      A[part.rows,:] .= part.A
      Y[part.rows] .= part.Y
      W[part.rows] .= part.W 
end

If this solves the issue, then the garbage collection call just caused sync to be called with different instances of SharedArrays on different processes and the same should be achieved with a @synccall.

cortner commented 1 year ago

We had the OOM also with multithreading, sonI doubt this is the issue. The question was not about the OOM but why calling GC after each assembly step might lead to significant slowdown. This surprises me very much.

tjjarvinen commented 1 year ago

why calling GC after each assembly step might lead to significant slowdown.

If it leads to @sync call then it has to sync the array to all processes. This causes a lot of communication and thus causes a slowdown.

cortner commented 1 year ago

But GC on different processes would hardly require that?

tjjarvinen commented 1 year ago

But GC on different processes would hardly require that?

A, Y and W are SharedArrays that need to be synced between processes. So, when one process updates one of them you need to sync. Julia probably buffers these updates, which causes the OOM-error (I am not sure of this). GC will then force a sync, which need to be carried to all processes.

cortner commented 1 year ago

I see - not completely but somewhat. It's also consistent with my observation that on a single node I don't seem to get a slowdown?

Chuck - worth a look I think?

wcwitt commented 1 year ago

Thanks - I generally agree with @tjjarvinen that a sync might be the culprit, but I don't experience the problem with distributed, only with threading. One difference is that in distributed each process gets its own garbage collector, whereas in multithreading there is just one.

I have an example for someone to try, but I'm realizing I shouldn't have hijacked this dead issue. I will post it here: https://github.com/ACEsuit/ACEfit.jl/pull/54.