JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
621 stars 67 forks source link

Distributed.jl bug: ConcurrencyViolationError("lock must be held") #478

Open schlichtanders opened 5 months ago

schlichtanders commented 5 months ago

When constructing an easy but kind of complete introductory example, I am running into ConcurrencyViolationError("lock must be held"), while the official nested loop example which I adapted it from explicitly said that Dagger does not need locks.

using Dagger: @spawn
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
# @sync waits until all enclosed calls to @spawn are ready
df = DataFrame()
@sync for i in 1:1000
    data = @spawn rand(10000)
    for agg in aggregators
        res = @spawn fit!(agg(), data)
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   
The full stacktrace ```julia julia> using Dagger: @spawn julia> using Distributed # add two further julia processes which could run on other machines WARNING: using Distributed.@spawn in module Main conflicts with an existing identifier. julia> addprocs(2, exeflags="--threads=2") # Distributed.@everywhere execute code on all machines 2-element Vector{Int64}: 2 3 julia> @everywhere using Dagger, DataFrames, OnlineStats # Dagger uses both Threads and Machines as processes julia> Dagger.all_processors() # let's distributes some calculations Set{Dagger.Processor} with 5 elements: Dagger.ThreadProc(2, 1) Dagger.ThreadProc(3, 1) Dagger.ThreadProc(1, 1) Dagger.ThreadProc(3, 2) Dagger.ThreadProc(2, 2) julia> aggregators = [Mean, Variance, Extrema] # @sync waits until all enclosed calls to @spawn are ready 3-element Vector{UnionAll}: Mean Variance Extrema julia> df = DataFrame() 0×0 DataFrame julia> @sync for i in 1:1000 data = @spawn rand(10000) for agg in aggregators res = @spawn fit!(agg(), data) push!(df, (i=i, aggregator=nameof(agg), result=res)) end end julia> df.result .= fetch.(df.result) ERROR: ThunkFailedException: Root Exception Type: CapturedException Root Exception: ConcurrencyViolationError("lock must be held") Stacktrace: [1] concurrency_violation @ ./condition.jl:8 [2] assert_havelock @ ./condition.jl:25 [inlined] [3] assert_havelock @ ./condition.jl:48 [inlined] [4] assert_havelock @ ./condition.jl:72 [inlined] [5] _wait2 @ ./condition.jl:83 [6] #wait#645 @ ./condition.jl:127 [7] wait @ ./condition.jl:125 [inlined] [8] wait_for_conn @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:195 [9] check_worker_state @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:170 [10] send_msg_ @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:172 [11] send_msg @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:122 [inlined] [12] #remotecall_fetch#159 @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:460 [13] remotecall_fetch @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:454 [14] remotecall_fetch @ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:492 [inlined] [15] #181 @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:490 [inlined] [16] forwardkeyerror @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:475 [17] poolget @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:489 [18] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:98 [19] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:96 [inlined] [20] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:102 [21] #invokelatest#2 @ ./essentials.jl:892 [inlined] [22] invokelatest @ ./essentials.jl:889 [inlined] [23] #166 @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1553 Stacktrace: [1] wait @ ./task.jl:352 [inlined] [2] fetch @ ./task.jl:372 [inlined] [3] fetch_report @ ~/.julia/packages/Dagger/Tx54v/src/sch/util.jl:263 [4] do_task @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1562 [5] #143 @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302 This Thunk: Thunk(id=212, fit!(Extrema: n=0 | value=(min = Inf, max = -Inf, nmin = 0, nmax = 0), Thunk[209](rand, ...))) Stacktrace: [1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool) @ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16 [2] fetch @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined] [3] #fetch#73 @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined] [4] fetch @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined] [5] _broadcast_getindex_evalf @ ./broadcast.jl:709 [inlined] [6] _broadcast_getindex @ ./broadcast.jl:682 [inlined] [7] getindex @ ./broadcast.jl:636 [inlined] [8] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64) @ Base.Broadcast ./broadcast.jl:1098 [9] restart_copyto_nonleaf!(newdest::Vector{…}, dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, val::Variance{…}, I::Int64,iter::Base.OneTo{…}, state::Int64, count::Int64) @ Base.Broadcast ./broadcast.jl:1089 [10] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64) @ Base.Broadcast ./broadcast.jl:1105 [11] copy @ ./broadcast.jl:950 [inlined] [12] materialize @ ./broadcast.jl:903 [inlined] [13] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…}) @ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207 [14] materialize! @ ./broadcast.jl:914 [inlined] [15] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…}) @ Base.Broadcast ./broadcast.jl:911 [16] top-level scope @ REPL[9]:1 Some type information was truncated. Use `show(err)` to see complete types. ```

Tried on Julia 1.10.1 and 1.10.2 with Dagger 0.18.8

schlichtanders commented 5 months ago

This might be because of missing thread-safety in Distributed.jl

schlichtanders commented 5 months ago

I now build explicit locking into the system, because I thought the issue was about accessing the data...

using Dagger: @spawn, @shard
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
df = DataFrame()

@everywhere function myfit!(lck, agg, data)
    lock(() -> fit!(agg(), data), lck)
end
# @sync waits until all enclosed calls to @spawn are ready
@sync for i in 1:1000
    data = @spawn rand(10000)
    # This creates a lock per worker. If the task is run on
    # a worker, the correct lock is automatically picked up.
    # Needed for multi-threading access to data.
    lck = @shard ReentrantLock()
    for agg in aggregators
        res = @spawn myfit!(lck, agg, data)
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   

I get a similar ConcurrencyViolationError("lock must be held") error

```julia julia> df.result .= fetch.(df.result) ERROR: ThunkFailedException: Root Exception Type: CapturedException Root Exception: ConcurrencyViolationError("lock must be held") Stacktrace: [1] concurrency_violation @ ./condition.jl:8 [2] assert_havelock @ ./condition.jl:25 [inlined] [3] assert_havelock @ ./condition.jl:48 [inlined] [4] assert_havelock @ ./condition.jl:72 [inlined] [5] _wait2 @ ./condition.jl:83 [6] #wait#645 @ ./condition.jl:127 [7] wait @ ./condition.jl:125 [inlined] [8] wait_for_conn @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:195 [9] check_worker_state @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:170 [10] send_msg_ @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:172 [11] send_msg @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:122 [inlined] [12] #remotecall_fetch#159 @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:460 [13] remotecall_fetch @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:454 [14] remotecall_fetch @ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:492 [inlined] [15] #181 @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:490 [inlined] [16] forwardkeyerror @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:475 [17] poolget @ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:489 [18] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:98 [19] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:96 [inlined] [20] move @ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:102 [21] #invokelatest#2 @ ./essentials.jl:892 [inlined] [22] invokelatest @ ./essentials.jl:889 [inlined] [23] #166 @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1553 Stacktrace: [1] wait @ ./task.jl:352 [inlined] [2] fetch @ ./task.jl:372 [inlined] [3] fetch_report @ ~/.julia/packages/Dagger/Tx54v/src/sch/util.jl:263 [4] do_task @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1562 [5] #143 @ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302 This Thunk: Thunk(id=9, myfit!(Dagger.Shard(Dict{Dagger.Processor, Dagger.Chunk}(OSProc(1) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(1, 6, 0x0000000000000060), OSProc(1), ProcessScope: worker == 1, false), OSProc(2) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(2, 0, 0x0000000000000060), OSProc(2), ProcessScope: worker == 2, false), OSProc(3) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(3, 1, 0x0000000000000060), OSProc(3), ProcessScope: worker== 3, false))), Mean, Thunk[5](rand, ...))) Stacktrace: [1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool) @ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16 [2] fetch @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined] [3] #fetch#73 @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined] [4] fetch @ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined] [5] _broadcast_getindex_evalf @ ./broadcast.jl:709 [inlined] [6] _broadcast_getindex @ ./broadcast.jl:682 [inlined] [7] getindex @ ./broadcast.jl:636 [inlined] [8] copy @ ./broadcast.jl:942 [inlined] [9] materialize @ ./broadcast.jl:903 [inlined] [10] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…}) @ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207 [11] materialize! @ ./broadcast.jl:914 [inlined] [12] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…}) @ Base.Broadcast ./broadcast.jl:911 [13] top-level scope @ REPL[10]:1 Some type information was truncated. Use `show(err)` to see complete types. ```
jpsamaroo commented 5 months ago

Yeah, fixing this will require using Julia 1.11 and adding https://github.com/JuliaLang/Distributed.jl/pull/4 to your project, to get the appropriate fixes into Distributed. It's very much Distributed's bug and not Dagger's, but I will keep this issue open anyway in case anyone else stumbles upon this.