JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
629 stars 67 forks source link

`DTables.groupby` causes issues when multiple processes available #450

Closed StevenWhitaker closed 10 months ago

StevenWhitaker commented 10 months ago

I have some code that involves several operations on DTables. I ran my code with nprocs() equal to 1, and everything worked fine. I then added some processes so that nprocs() equaled 5 and ran my code again on worker 1 (so I didn't explicitly use any of the added workers). In this case, my code would hang when calling reduce on a GDTable (i.e., after calling groupby).

I tried to create a MWE, but I haven't yet been able to find one that hangs. Fortunately, I did find a MWE that gives a different error (ConcurrencyViolationError("lock must be held")); hopefully this error and the hanging I'm experiencing are different manifestations of the same issue.

EDIT: The next comment contains a simpler MWE that produces the same error (slightly different stacktrace, though).

Contents of mwe.jl:

using Distributed
nworkers = 4
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables, DataFrames, CSV

function f()
    dt = DTable(x -> CSV.File(x), ["file.csv"]; tabletype = DataFrame)
    df = fetch(dt)
    gdt = groupby(dt, Symbol.(names(df)[[6, 12, 48]]))
    sums = fetch(reduce(+, gdt; cols = Symbol.(names(df)[[93, 94]])))
end

f()

Error:

julia> include("mwe.jl")
ERROR: LoadError: ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 2:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 2:
ConcurrencyViolationError("lock must be held")
Stacktrace:
  [1] assert_havelock
    @ ./condition.jl:25 [inlined]
  [2] assert_havelock
    @ ./condition.jl:48 [inlined]
  [3] assert_havelock
    @ ./condition.jl:72 [inlined]
  [4] _wait2
    @ ./condition.jl:83
  [5] #wait#621
    @ ./condition.jl:127
  [6] wait
    @ ./condition.jl:125 [inlined]
  [7] wait_for_conn
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:195
  [8] check_worker_state
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:170
  [9] send_msg_
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:172
 [10] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [11] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [12] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [13] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [14] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [15] #171
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:424 [inlined]
 [16] forwardkeyerror
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:409
 [17] poolget
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:423
 [18] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:98
 [19] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:96 [inlined]
 [20] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:102
 [21] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:21
 [22] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [23] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [24] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [25] _broadcast_getindex_evalf
    @ ./broadcast.jl:683 [inlined]
 [26] _broadcast_getindex
    @ ./broadcast.jl:656 [inlined]
 [27] _getindex
    @ ./broadcast.jl:679 [inlined]
 [28] _broadcast_getindex
    @ ./broadcast.jl:655 [inlined]
 [29] getindex
    @ ./broadcast.jl:610 [inlined]
 [30] copyto_nonleaf!
    @ ./broadcast.jl:1068
 [31] copy
    @ ./broadcast.jl:920 [inlined]
 [32] materialize
    @ ./broadcast.jl:873 [inlined]
 [33] #79
    @ ~/.julia/packages/DTables/BjdY2/src/operations/operations.jl:187
 [34] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [35] invokelatest
    @ ./essentials.jl:816 [inlined]
 [36] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=9, #79(5 inputs...))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
  [5] #86
    @ ./none:0
  [6] iterate
    @ ./generator.jl:47 [inlined]
  [7] collect
    @ ./array.jl:782
  [8] #83
    @ ~/.julia/packages/DTables/BjdY2/src/operations/operations.jl:205
  [9] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [10] invokelatest
    @ ./essentials.jl:816 [inlined]
 [11] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=11, #83(5 inputs...))
Stacktrace:
 [1] fetch(t::Dagger.ThunkFuture; proc::Dagger.OSProc, raw::Bool)
   @ Dagger ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
 [2] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [3] #fetch#75
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [4] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [5] f()
   @ Main ~/tmp/mwe.jl:11
 [6] top-level scope
   @ ~/tmp/mwe.jl:14
 [7] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [8] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:14

Some notes:

StevenWhitaker commented 10 months ago

I just came across basically the same error, but with a slightly different stacktrace:

julia> using Distributed; addprocs(10); @everywhere using Dagger, DTables, DataFrames

julia> dt = DTable(DataFrame(a = 1:100, b = rand(1:5, 100)))
DTable with 1 partitions
Tabletype: DataFrame

julia> gdt = groupby(dt, :b)
ERROR: ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 5:
ConcurrencyViolationError("lock must be held")
Stacktrace:
  [1] concurrency_violation
    @ ./condition.jl:8
  [2] assert_havelock
    @ ./condition.jl:25 [inlined]
  [3] assert_havelock
    @ ./condition.jl:48 [inlined]
  [4] assert_havelock
    @ ./condition.jl:72 [inlined]
  [5] _wait2
    @ ./condition.jl:83
  [6] #wait#621
    @ ./condition.jl:127
  [7] wait
    @ ./condition.jl:125 [inlined]
  [8] wait_for_conn
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:195
  [9] check_worker_state
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:170
 [10] send_msg_
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:172
 [11] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [12] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [13] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [14] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [15] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [16] #171
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:424 [inlined]
 [17] forwardkeyerror
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:409
 [18] poolget
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:423
 [19] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:98
 [20] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:96 [inlined]
 [21] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:102
 [22] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:21
 [23] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [24] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [25] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [26] build_groupby_index
    @ ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:176
 [27] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [28] invokelatest
    @ ./essentials.jl:816 [inlined]
 [29] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=3, build_groupby_index(true, 0, DataFrame, Thunk[2](#134, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{DataFrame, MemPool.DRef, OSProc, AnyScope}(DataFrame, UnitDomain(), MemPool.DRef(1, 0, 0x0000000000000908), OSProc(1), AnyScope(), false)], DTables.var"#123#125"{Symbol, DTables.var"#122#124"}(:b, DTables.var"#122#124"())])))
Stacktrace:
 [1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
   @ Dagger ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
 [2] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [3] #fetch#75
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [4] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [5] _groupby(d::DTable, row_function::Function, cols::Vector{Symbol}, merge::Bool, chunksize::Int64)
   @ DTables ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:132
 [6] #groupby#121
   @ ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:38 [inlined]
 [7] groupby(d::DTable, col::Symbol)
   @ DTables ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:35
 [8] top-level scope
   @ REPL[3]:1
StevenWhitaker commented 10 months ago

The above error appears to go away when I wrap the code with Dagger.with_options(f; scope = ProcessScope(myid())).

jpsamaroo commented 10 months ago

@StevenWhitaker can you try this with Julia master (or any recent 1.11 version), and https://github.com/JuliaLang/Distributed.jl/pull/4? This is just a Distributed concurrency bug that my PR fixes. You can ] dev /path/to/Distributed in your project to make Julia load it. It requires 1.11, as it has support for external stdlibs.

StevenWhitaker commented 10 months ago

Your PR does seem to fix the issue!

jpsamaroo commented 10 months ago

Awesome! Considering this isn't our bug, I'm going to close this, with the understanding that I plan to get that PR merged.