JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
629 stars 67 forks source link

`"Multiple concurrent writes to Dict detected!"` with `DTables.reduce` #437

Closed StevenWhitaker closed 11 months ago

StevenWhitaker commented 11 months ago

I occasionally get the above error message with the following example. I'm not sure if this issue should go in DTables.jl, but I'm putting it here because the other issues I posted there got migrated here :)

Contents of mwe.jl:

using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables

remotecall_fetch(2) do
    N = 100
    dt = DTable((a = 1:N, b = rand(N)))
    fetch(reduce(+, dt; cols = [:a]))
end

Results:

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
ERROR: LoadError: On worker 2:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
AssertionError: Multiple concurrent writes to Dict detected!
Stacktrace:
  [1] rehash!
    @ ./dict.jl:208
  [2] _setindex!
    @ ./dict.jl:355 [inlined]
  [3] get!
    @ ./dict.jl:477
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [5] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [6] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11
  [8] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
  [9] invokelatest
    @ ./essentials.jl:816 [inlined]
 [10] #29
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:67 [inlined]
 [11] lock
    @ ./lock.jl:229
 [12] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:66 [inlined]
 [13] #28
    @ ./task.jl:514
Stacktrace:
  [1] exec!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:108
  [2] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:126
  [3] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [4] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [5] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [6] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
  [7] #66
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
  [8] iterate
    @ ./generator.jl:47 [inlined]
  [9] collect
    @ ./array.jl:782
 [10] #65
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:146
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] #43
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:157
  [5] #156
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] #scoped#4
    @ ./deprecated.jl:116
 [13] scoped
    @ ./deprecated.jl:113
 [14] with_options
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:16
 [15] do_task
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1549
 [16] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1243 [inlined]
 [17] #130
    @ ./task.jl:134
  Root Thunk:  Thunk(id=22, #65(Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()))
  Inner Thunk: Thunk(id=25, #47(+, Thunk[24](#44, Any[:a, Thunk[22](#65, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()])]), Base._InitialValue()))
  This Thunk:  Thunk(id=25, #47(+, Thunk[24](#44, Any[:a, Thunk[22](#65, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()])]), Base._InitialValue()))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:54 [inlined]
  [5] _broadcast_getindex_evalf
    @ ./broadcast.jl:683 [inlined]
  [6] _broadcast_getindex
    @ ./broadcast.jl:656 [inlined]
  [7] getindex
    @ ./broadcast.jl:610 [inlined]
  [8] copy
    @ ./broadcast.jl:912 [inlined]
  [9] materialize
    @ ./broadcast.jl:873 [inlined]
 [10] #50
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:115
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] #43
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:157
  [5] #156
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] #scoped#4
    @ ./deprecated.jl:116
 [13] scoped
    @ ./deprecated.jl:113
 [14] with_options
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:16
 [15] do_task
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1549
 [16] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1243 [inlined]
 [17] #130
    @ ./task.jl:134
  This Thunk:  Thunk(id=26, #50([:a], Dagger.EagerThunk[EagerThunk (finished)]))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:54 [inlined]
  [5] #13
    @ ~/tmp/mwe.jl:10
  [6] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
  [7] invokelatest
    @ ./essentials.jl:816
  [8] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
  [9] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [10] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [11] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:7
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:7

Notice that not every run results in the error.

I also occasionally see the following error printed, but the result of the remotecall_fetch still returns normally with the correct answer:

Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
Stacktrace:
 [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
   @ Base ./abstractarray.jl:949
 [2] _collect
   @ ./array.jl:713 [inlined]
 [3] collect
   @ ./array.jl:707 [inlined]
 [4] macro expansion
   @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1189 [inlined]
 [5] (::Dagger.Sch.var"#126#133"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
   @ Dagger.Sch ./task.jl:134

This is with DTables v0.4.1 and Dagger v0.18.3.

jpsamaroo commented 11 months ago

Yep, definitely a Dagger bug (and in the same newly-upgraded submission logic)! I see the bug - I forgot to add locking at https://github.com/JuliaParallel/Dagger.jl/blob/0d525ccdcd93755d693a70ad8f532f4f27d06f02/src/submission.jl#L128 (but do it correctly in the else branch). Will plan to push a fix tonight, and also will take a look at that copyto! error and see if it's related.

Thanks again for the excellent reporting!

StevenWhitaker commented 11 months ago

Thanks, I'm just glad you're able to fix these issues pretty quickly!

StevenWhitaker commented 11 months ago

I'm finding a possibly related error in some code I have that looks similar to the OP example.

Code excerpt:

gdt = groupby(dt, cols)
gkeys = sort!(collect(keys(gdt)))
sums = map(gkeys) do key
    reduce(+, gdt[key]; cols = sum_cols)
end .|> fetch

Error (it's printed to a log, so the nice formatting is lost, unfortunately):

      From worker 2:    └  Dagger.ThunkFailedException{RemoteException}(Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), RemoteException(3, CapturedException(CapturedException(ConcurrencyViolationError("lock must be held"), Any[(concurrency_violation at condition.jl:8, 1), (assert_havelock at condition.jl:25 [inlined], 1), (assert_havelock at condition.jl:48 [inlined], 1), (assert_havelock at condition.jl:72 [inlined], 1), (_wait2 at condition.jl:83, 1), (#wait#621 at condition.jl:127, 1), (wait at condition.jl:125 [inlined], 1), (wait_for_conn at cluster.jl:195, 1), (check_worker_state at cluster.jl:170, 1), (send_msg_ at messages.jl:172, 1), (send_msg at messages.jl:122 [inlined], 1), (#remotecall_fetch#159 at remotecall.jl:460, 1), (remotecall_fetch at remotecall.jl:454, 1), (#remotecall_fetch#162 at remotecall.jl:492 [inlined], 1), (remotecall_fetch at remotecall.jl:492 [inlined], 1), (#171 at datastore.jl:424 [inlined], 1), (forwardkeyerror at datastore.jl:409, 1), (poolget at datastore.jl:423, 1), (move at chunks.jl:98, 1), (move at chunks.jl:96 [inlined], 1), (move at chunks.jl:102, 1), (#fetch#70 at eager_thunk.jl:21, 1), (fetch at eager_thunk.jl:11 [inlined], 1), (#fetch#75 at eager_thunk.jl:58 [inlined], 1), (fetch at eager_thunk.jl:54 [inlined], 1), (_broadcast_getindex_evalf at broadcast.jl:683 [inlined], 1), (_broadcast_getindex at broadcast.jl:656 [inlined], 1), (getindex at broadcast.jl:610 [inlined], 1), (copy at broadcast.jl:912 [inlined], 1), (materialize at broadcast.jl:873 [inlined], 1), (#50 at operations.jl:115, 1), (#invokelatest#2 at essentials.jl:819 [inlined], 1), (invokelatest at essentials.jl:816 [inlined], 1), (#43 at processor.jl:162, 1)]), Any[(wait at task.jl:349 [inlined], 1), (fetch at task.jl:369 [inlined], 1), (#execute!#42 at processor.jl:172, 1), (execute! at processor.jl:157, 1), (#156 at Sch.jl:1551 [inlined], 1), (#21 at options.jl:17 [inlined], 1), (#1 at ScopedValues.jl:163, 1), (with_logstate at logging.jl:514, 1), (with_logger at logging.jl:626 [inlined], 1), (enter_scope at payloadlogger.jl:17 [inlined], 1), (with at ScopedValues.jl:162, 1), (#scoped#4 at deprecated.jl:116, 1), (scoped at deprecated.jl:113, 1), (with_options at options.jl:16, 1), (do_task at Sch.jl:1549, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#130 at task.jl:134, 1)])))
      From worker 2:    [ 2023-09-28T08:31:34.312 ] pid: 2137 proc: 2 Info:    > Dagger.ThunkFailedException{RemoteException}(Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), RemoteException(3, CapturedException(CapturedException(ConcurrencyViolationError("lock must be held"), Any[(concurrency_violation at condition.jl:8, 1), (assert_havelock at condition.jl:25 [inlined], 1), (assert_havelock at condition.jl:48 [inlined], 1), (assert_havelock at condition.jl:72 [inlined], 1), (_wait2 at condition.jl:83, 1), (#wait#621 at condition.jl:127, 1), (wait at condition.jl:125 [inlined], 1), (wait_for_conn at cluster.jl:195, 1), (check_worker_state at cluster.jl:170, 1), (send_msg_ at messages.jl:172, 1), (send_msg at messages.jl:122 [inlined], 1), (#remotecall_fetch#159 at remotecall.jl:460, 1), (remotecall_fetch at remotecall.jl:454, 1), (#remotecall_fetch#162 at remotecall.jl:492 [inlined], 1), (remotecall_fetch at remotecall.jl:492 [inlined], 1), (#171 at datastore.jl:424 [inlined], 1), (forwardkeyerror at datastore.jl:409, 1), (poolget at datastore.jl:423, 1), (move at chunks.jl:98, 1), (move at chunks.jl:96 [inlined], 1), (move at chunks.jl:102, 1), (#fetch#70 at eager_thunk.jl:21, 1), (fetch at eager_thunk.jl:11 [inlined], 1), (#fetch#75 at eager_thunk.jl:58 [inlined], 1), (fetch at eager_thunk.jl:54 [inlined], 1), (_broadcast_getindex_evalf at broadcast.jl:683 [inlined], 1), (_broadcast_getindex at broadcast.jl:656 [inlined], 1), (getindex at broadcast.jl:610 [inlined], 1), (copy at broadcast.jl:912 [inlined], 1), (materialize at broadcast.jl:873 [inlined], 1), (#50 at operations.jl:115, 1), (#invokelatest#2 at essentials.jl:819 [inlined], 1), (invokelatest at essentials.jl:816 [inlined], 1), (#43 at processor.jl:162, 1)]), Any[(wait at task.jl:349 [inlined], 1), (fetch at task.jl:369 [inlined], 1), (#execute!#42 at processor.jl:172, 1), (execute! at processor.jl:157, 1), (#156 at Sch.jl:1551 [inlined], 1), (#21 at options.jl:17 [inlined], 1), (#1 at ScopedValues.jl:163, 1), (with_logstate at logging.jl:514, 1), (with_logger at logging.jl:626 [inlined], 1), (enter_scope at payloadlogger.jl:17 [inlined], 1), (with at ScopedValues.jl:162, 1), (#scoped#4 at deprecated.jl:116, 1), (scoped at deprecated.jl:113, 1), (with_options at options.jl:16, 1), (do_task at Sch.jl:1549, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#130 at task.jl:134, 1)])))
      From worker 2:    [ 2023-09-28T08:31:34.444 ] pid: 2137 proc: 2 Info:  Base.StackTraces.StackFrame[fetch(t::Dagger.ThunkFuture; proc::Dagger.OSProc, raw::Bool) at eager_thunk.jl:16, fetch at eager_thunk.jl:11 [inlined], #fetch#75 at eager_thunk.jl:58 [inlined], fetch at eager_thunk.jl:54 [inlined], |> at operators.jl:907 [inlined], _broadcast_getindex_evalf at broadcast.jl:683 [inlined], _broadcast_getindex at broadcast.jl:656 [inlined], getindex at broadcast.jl:610 [inlined], copyto_nonleaf!(dest::Vector{NamedTuple{(column names), NTuple{12, Float64}}}, bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Tuple{Base.OneTo{Int}}, typeof(|>), Tuple{Base.Broadcast.Extruded{Vector{Dagger.EagerThunk}, Tuple{Bool}, Tuple{Int}}, Base.RefValue{typeof(fetch)}}}, iter::Base.OneTo{Int}, state::Int, count::Int) at broadcast.jl:1068, copy at broadcast.jl:920 [inlined], materialize(bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, typeof(|>), Tuple{Vector{Dagger.EagerThunk}, Base.RefValue{typeof(fetch)}}}) at broadcast.jl:873, stacktrace through my code, (::Distributed.var"#110#112"{Distributed.CallMsg{:call_fetch}})() at process_messages.jl:285, run_work_thunk(thunk::Distributed.var"#110#112"{Distributed.CallMsg{:call_fetch}}, print_error::Bool) at process_messages.jl:70, macro expansion at process_messages.jl:285 [inlined], (::Distributed.var"#109#111"{Distributed.CallMsg{:call_fetch}, Distributed.MsgHeader, Sockets.TCPSocket})() at task.jl:514]

The key part of the error I think is: ConcurrencyViolationError("lock must be held"). Because it has to do with locking I thought it might be related to the OP. I tried isolating the code in a MWE but ended up getting the "Multiple concurrent writes to Dict detected!" error instead.

jpsamaroo commented 11 months ago

Aside from the most obvious issue (which I have fixed locally), I'm also seeing a variety of concurrency issues, and am trying to narrow them down.

StevenWhitaker commented 11 months ago

Thanks for the update, hopefully the other issues can be resolved soon as well!

StevenWhitaker commented 11 months ago

@jpsamaroo Any updates on this front?

jpsamaroo commented 11 months ago

Sorry, not yet, I'm in the middle of getting ready to move across the US, so I will have to get back to this over the weekend/next week. I did find a variety of nearly identical segfaults across the stack, so there is definitely a common source, I just need to find it.

jpsamaroo commented 11 months ago

I've narrowed this down to some issue with the usage of WeakRef within WeakChunk, where not using WeakRef (just storing the Chunk directly) fixes the segfaults that I was seeing. I'm going to try to put together a workaround that instead uses WeakKeyDict, but if that doesn't work, I'll just drop the usage of WeakRef until I can get this figured out and fixed.

StevenWhitaker commented 11 months ago

Great, thanks for the update and for your work on this; I really appreciate it!

I'm in the middle of getting ready to move across the US

I got to do that a year ago; it's definitely a lot of work, so I understand how busy you must be! Hopefully that all goes smoothly for you!

jpsamaroo commented 11 months ago

I've found the issue - WeakRef serialization is not implemented, and we were serializing them during task submission (hence why this only started occurring recently). I'm working on a workaround, but running into other fun bugs in the process, so this may take a few more days. Thank you for your patience!

jpsamaroo commented 11 months ago

Ok, I've got a fix locally for this that gets the following example working:

using Dagger, DTables
using Distributed
addprocs(1)

@everywhere using DTables

remotecall_fetch(2) do
    N = 2
    dt = DTable((a = 1:N, b = rand(N)))
    @sync for i in 1:20
        Threads.@spawn begin
            println("Iter $i")
            fetch(reduce(+, dt; cols = [:a]))
        end
    end
end

I'll push the full fix for it soon!

Another fun issue that was found in the process of debugging this: Chunks hash equivalently when they refer to the same data, even if the Chunks are different objects. This makes many internal parts of Dagger much simpler to work with (since they do refer to the same data, in the end, so they are functionally interchangeable). However, when we serialize the same Chunk across the wire, we get back different Chunk objects on the other end. This means that we need to be careful when working with deserialized Chunks to make sure that we don't naively insert them as keys into Dicts for the purpose of GC object retention - when we need to do this, we need to instead see if the Dict already has an equivalent Chunk, and if so, use that Chunk instead of the one we're currently looking at (which is a logical duplicate of it). If we don't, we might "bump out" an equivalent Chunk from the Dict that really needed to stay there to keep the referenced remote data alive, causing the GC to delete things that we were relying on. Oh, the joys of remote garbage collection :smile: