JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
638 stars 67 forks source link

`KeyError` with `select`ing multiple columns (also with `reduce` and `combine`) with Distributed #435

Closed StevenWhitaker closed 1 year ago

StevenWhitaker commented 1 year ago

The following example produces a KeyError.

Contents of mwe.jl:

using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables, DataFrames

# See JuliaParallel/Dagger.jl#431 for why the next two lines are needed.
dt = DTable(DataFrame(a = [1], b = [2]))
dnew = select(dt, [1, 2]...)

remotecall_fetch(2) do
    df_remote = DataFrame(a = [1], b = [2])
    dt_remote = DTable(df_remote)
    dnew_remote = select(dt_remote, [1, 2]...)
    fetch(dnew_remote) # length(dnew_remote) also causes the `KeyError`
end

Results:

julia> include("mwe.jl")
ERROR: LoadError: On worker 2:
KeyError: key 0x000000000000000a not found
Stacktrace:
  [1] getindex
    @ ./dict.jl:484
  [2] #95
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:44
  [3] lock
    @ ~/.julia/packages/Dagger/xGAvM/src/utils/locked-object.jl:12
  [4] #eager_submit_internal!#94
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:36
  [5] eager_submit_internal!
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:9 [inlined]
  [6] eager_submit_internal!
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:7
  [7] #invokelatest#2
    @ ./essentials.jl:819
  [8] invokelatest
    @ ./essentials.jl:816
  [9] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [10] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [11] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [12] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:124
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/xGAvM/src/submission.jl:192
  [7] enqueue!
    @ ~/.julia/packages/Dagger/xGAvM/src/queue.jl:12 [inlined]
  [8] #spawn#86
    @ ~/.julia/packages/Dagger/xGAvM/src/thunk.jl:304
  [9] spawn
    @ ~/.julia/packages/Dagger/xGAvM/src/thunk.jl:268 [inlined]
 [10] JuliaParallel/DTables.jl#8
    @ ~/.julia/packages/Dagger/xGAvM/src/thunk.jl:383 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] collect
    @ ./array.jl:782
 [13] trim!
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:231
 [14] trim
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:241
 [15] retrieve_partitions
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:178 [inlined]
 [16] fetch
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:166
 [17] JuliaParallel/DTables.jl#5
    @ ~/tmp/mwe.jl:15
 [18] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [19] invokelatest
    @ ./essentials.jl:816
 [20] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [21] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [22] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [23] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454 [inlined]
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:11
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:11

The code works if just one column is selected, e.g., select(dt_remote, 1).

Also note that calling fetch(dnew) before the remotecall_fetch produces the expected DataFrame on worker 1, but then the remotecall_fetch segfaults:

[23268] signal (11.1): Segmentation fault
in expression starting at /home/steven/tmp/mwe.jl:12
jl_object_id__cold at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/builtins.c:417
type_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1332
typekey_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1344
jl_precompute_memoized_dt at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1409
inst_datatype_inner at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1731
jl_inst_arg_tuple_type at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1826
arg_type_tuple at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2100 [inlined]
jl_lookup_generic_ at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2884 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2936
cleanup_syncdeps! at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/util.jl:49
finish_task! at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:907
unknown function (ip: 0x7f904c7cefd5)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
#91 at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:575
lock at ./lock.jl:229
unknown function (ip: 0x7f9027b6b436)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
scheduler_run at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:525
#compute_dag#82 at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:449
compute_dag at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:414 [inlined]
#compute#139 at /home/steven/.julia/packages/Dagger/xGAvM/src/compute.jl:23
compute at /home/steven/.julia/packages/Dagger/xGAvM/src/compute.jl:22 [inlined]
macro expansion at /home/steven/.julia/packages/Dagger/xGAvM/src/sch/eager.jl:28 [inlined]
JuliaParallel/DTables.jl#50 at ./threadingconstructs.jl:410
unknown function (ip: 0x7f904c79d55f)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
jl_apply at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/julia.h:1880 [inlined]
start_task at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/task.c:1092
Allocations: 39859451 (Pool: 39828312; Big: 31139); GC: 52
StevenWhitaker commented 1 year ago

I also get a segfault if I use the master branch of Dagger.jl and/or the main branch of DTables.jl, and I also sometimes get a segfault (instead of a KeyError) with the original example (i.e., with Dagger v0.18.2 and DTables v0.4.1).

StevenWhitaker commented 1 year ago

Replacing the select call in the remotecall_fetch with reduce((x, y) -> x + y, dt_remote; cols = [:a, :b]) also results in a KeyError. In this case, it is the actual call to reduce (not fetch) that causes the KeyError.

Replacing the select call with combine(dt_remote, [:a, :b] .=> sum; renamecols = false) also results in a KeyError when calling fetch on the result.

jpsamaroo commented 1 year ago

Whew, great catch!